lsl-sys 0.1.1

Low-level bindings to the system liblsl library (lab streaming layer).
#include <iostream>
#include <boost/asio/ip/multicast.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/algorithm/string/trim.hpp>
#include "api_config.h"
#include "cast.h"
#include "resolve_attempt_udp.h"
#include "resolver_impl.h"
#include "socket_utils.h"


// === implementation of the resolver_burst_udp class ===

using namespace lsl;
using namespace lslboost::asio;

/**
* Instantiate and set up a new resolve attempt.
* @param io The io_context that will run the async operations.
* @param protocol The protocol (either udp::v4() or udp::v6()) to use for communications; 
*				  only the subset of target addresses matching this protocol will be considered.
* @param targets A list of udp::endpoint that should be targetd by this query.
* @param query The query string to send (usually a set of conditions on the properties of the stream info that should be searched,
*              for example "name='BioSemi' and type='EEG'" (without the outer ""). See stream_info_impl::matches_query() for the 
*			   definition of a query.
* @param results Reference to a container into which results are stored; potentially shared with other parallel resolve operations.
*			     Since this is not thread-safe all operations modifying this must run on the same single-threaded IO service.
* @param results_mut Reference to a mutex that protects the container.
* @param cancel_after The time duration after which the attempt is automatically cancelled, i.e. the receives are ended.
* @param registry A registry where the attempt can register itself as active so it can be cancelled during shutdown.
*/
resolve_attempt_udp::resolve_attempt_udp(io_context &io, const udp &protocol,
	const std::vector<udp::endpoint> &targets, const std::string &query, result_container &results,
	lslboost::mutex &results_mut, double cancel_after, cancellable_registry *registry)
	: io_(io), results_(results), results_mut_(results_mut), cancel_after_(cancel_after),
	  cancelled_(false), targets_(targets), query_(query), unicast_socket_(io),
	  broadcast_socket_(io), multicast_socket_(io), recv_socket_(io), cancel_timer_(io) {
	// open the sockets that we might need
	recv_socket_.open(protocol);
	try {
		bind_port_in_range(recv_socket_,protocol);
	} catch(std::exception &e) {
		std::cerr << "Could not bind to a port in the configured port range; using a randomly assigned one: " << e.what() << std::endl;
	}
	unicast_socket_.open(protocol);
	try {
		broadcast_socket_.open(protocol);
		broadcast_socket_.set_option(socket_base::broadcast(true));
	} catch(std::exception &e) {
		std::cerr << "Cannot open UDP broadcast socket for resolves: " << e.what() << std::endl;
	}
	try {
		multicast_socket_.open(protocol);
		multicast_socket_.set_option(ip::multicast::hops(api_config::get_instance()->multicast_ttl()));
	} catch(std::exception &e) {
		std::cerr << "Cannot open UDP multicast socket for resolves: " << e.what() << std::endl;
	}

	// precalc the query id (hash of the query string, as string)
	query_id_ = to_string(lslboost::hash<std::string>()(query));
	// precalc the query message
	std::ostringstream os; os.precision(16);
	os << "LSL:shortinfo\r\n";
	os << query_ << "\r\n";
	os << recv_socket_.local_endpoint().port() << " " << query_id_ << "\r\n";
	query_msg_ = os.str();

	// register ourselves as a candidate for cancellation
	if (registry)
		register_at(registry);
}

resolve_attempt_udp::~resolve_attempt_udp() {
	// make sure that the cancel is unregistered before the resolve attempt is being deleted...
	unregister_from_all();
}

// === externally-triggered asynchronous commands ===

/// Start the attempt.
void resolve_attempt_udp::begin() {
	// initiate the result gathering chain
	receive_next_result();
	// initiate the send chain
	send_next_query(targets_.begin());

	// also initiate the cancel event, if desired
	if (cancel_after_ != FOREVER) {
		cancel_timer_.expires_after(timeout_sec(cancel_after_));
		cancel_timer_.async_wait(lslboost::bind(&resolve_attempt_udp::handle_timeout,shared_from_this(),placeholders::error));
	}
}

/// Post a command to cancel all operations.
void resolve_attempt_udp::cancel() {
	post(io_, lslboost::bind(&resolve_attempt_udp::do_cancel, shared_from_this()));
}


// === receive loop ===

/// This function asks to receive the next result packet
void resolve_attempt_udp::receive_next_result() {
	recv_socket_.async_receive_from(buffer(resultbuf_),remote_endpoint_,
		lslboost::bind(&resolve_attempt_udp::handle_receive_outcome,shared_from_this(),placeholders::error,placeholders::bytes_transferred));
}

/// Handler that gets called when a receive has completed
void resolve_attempt_udp::handle_receive_outcome(error_code err, std::size_t len) {
	if (!cancelled_ && err != error::operation_aborted && err != error::not_connected && err != error::not_socket) {
		if (!err) {
			try {
				// first parse & check the query id
				std::istringstream is(std::string(resultbuf_,len));
				std::string returned_id; getline(is,returned_id); lslboost::trim(returned_id);
				if (returned_id == query_id_) {
					// parse the rest of the query into a stream_info
					stream_info_impl info;
					std::ostringstream os; os << is.rdbuf();
					info.from_shortinfo_message(os.str());
					std::string uid = info.uid();
					{
						// update the results
						lslboost::lock_guard<lslboost::mutex> lock(results_mut_);
						if (results_.find(uid) == results_.end())
							results_[uid] = std::make_pair(info,lsl_clock()); // insert new result
						else							
							results_[uid].second = lsl_clock(); // update only the receive time
						// ... also update the address associated with the result (but don't override the 
						// address of an earlier record for this stream since this would be the faster route)
						if (remote_endpoint_.address().is_v4()) {
							if (results_[uid].first.v4address().empty())
								results_[uid].first.v4address(remote_endpoint_.address().to_string());
						} else {
							if (results_[uid].first.v6address().empty())
								results_[uid].first.v6address(remote_endpoint_.address().to_string());
						}
					}
				}
			} catch(std::exception &e) {
				std::cerr << "resolve_attempt_udp: hiccup while processing the received data: " << e.what() << std::endl;
			}
		}
		// ask for the next result
		receive_next_result();
	}
}


// === send loop ===

/// Thus function starts an async send operation for the given current endpoint
void resolve_attempt_udp::send_next_query(endpoint_list::const_iterator i) {
	if (i != targets_.end() && !cancelled_) {
		udp::endpoint ep(*i);
		// endpoint matches our active protocol?
		if (ep.protocol() == recv_socket_.local_endpoint().protocol()) {
			// select socket to use
			udp::socket &sock =
				(ep.address() == ip::address_v4::broadcast())
					? broadcast_socket_
					: (ep.address().is_multicast() ? multicast_socket_ : unicast_socket_);
			// and send the query over it
			sock.async_send_to(lslboost::asio::buffer(query_msg_), ep,
				lslboost::bind(&resolve_attempt_udp::handle_send_outcome,shared_from_this(),++i,placeholders::error));
		} else
			// otherwise just go directly to the next query
			send_next_query(++i);
	}
}

/// Handler that gets called when a send has completed
void resolve_attempt_udp::handle_send_outcome(endpoint_list::const_iterator i, error_code err) {
	if (!cancelled_ && err != error::operation_aborted && err != error::not_connected && err != error::not_socket)
		send_next_query(i);
}


// === cancellation logic ===

/// Give up the current operations.
void resolve_attempt_udp::handle_timeout(error_code err) {
	if (!err)
		do_cancel();
}

/// Cancel the outstanding operations.
void resolve_attempt_udp::do_cancel() {
	try {
		cancelled_ = true;
		if (unicast_socket_.is_open())
			unicast_socket_.close();
		if (multicast_socket_.is_open())
			multicast_socket_.close();
		if (broadcast_socket_.is_open())
			broadcast_socket_.close();
		if (recv_socket_.is_open())
			recv_socket_.close();
		cancel_timer_.cancel();
	} catch(std::exception &e) {
		std::cerr << "Unexpected error while trying to cancel operations of resolve_attempt_udp: " << e.what() << std::endl;
	}
}