#include "time_receiver.h"
#include "api_config.h"
#include <boost/asio/placeholders.hpp>
#include <iostream>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include "inlet_connection.h"
#include "socket_utils.h"
using namespace lsl;
using namespace lslboost::asio;
time_receiver::time_receiver(inlet_connection &conn): conn_(conn), was_reset_(false), timeoffset_(std::numeric_limits<double>::max()),
remote_time_(std::numeric_limits<double>::max()), uncertainty_(std::numeric_limits<double>::max()),
cfg_(api_config::get_instance()), time_sock_(time_io_), next_estimate_(time_io_), aggregate_results_(time_io_), next_packet_(time_io_) {
conn_.register_onlost(this,&timeoffset_upd_);
conn_.register_onrecover(this,lslboost::bind(&time_receiver::reset_timeoffset_on_recovery,this));
time_sock_.open(conn_.udp_protocol());
}
time_receiver::~time_receiver() {
try {
conn_.unregister_onrecover(this);
conn_.unregister_onlost(this);
time_io_.stop();
if (time_thread_.joinable())
time_thread_.join();
}
catch(std::exception &e) {
std::cerr << "Unexpected error during destruction of a time_receiver: " << e.what() << std::endl;
}
catch(...) {
std::cerr << "Severe error during time receiver shutdown." << std::endl;
}
}
double time_receiver::time_correction(double timeout) {
double remote_time, uncertainty;
return time_correction(&remote_time, &uncertainty, timeout);
}
double time_receiver::time_correction(double *remote_time, double *uncertainty, double timeout ) {
lslboost::unique_lock<lslboost::mutex> lock(timeoffset_mut_);
if (!timeoffset_available()) {
if (!time_thread_.joinable())
time_thread_ = lslboost::thread(&time_receiver::time_thread,this);
if (timeout >= FOREVER)
timeoffset_upd_.wait(lock, lslboost::bind(&time_receiver::timeoffset_available,this));
else
if (!timeoffset_upd_.wait_for(lock, lslboost::chrono::duration<double>(timeout), lslboost::bind(&time_receiver::timeoffset_available,this)))
throw timeout_error("The time_correction() operation timed out.");
}
if (conn_.lost())
throw lost_error("The stream read by this inlet has been lost. To recover, you need to re-resolve the source and re-create the inlet.");
*remote_time = remote_time_;
*uncertainty = uncertainty_;
return timeoffset_;
}
bool time_receiver::was_reset() {
lslboost::unique_lock<lslboost::mutex> lock(timeoffset_mut_);
bool result = was_reset_;
was_reset_ = false;
return result;
}
void time_receiver::time_thread() {
conn_.acquire_watchdog();
try {
start_time_estimation();
while (true) {
try {
time_io_.run();
break;
} catch(std::exception &e) {
std::cerr << "Hiccup during time_thread io_context processing: " << e.what() << std::endl;
}
}
} catch(std::exception &e) {
std::cerr << "time_thread failed unexpectedly with message: " << e.what() << std::endl;
}
conn_.release_watchdog();
}
void time_receiver::start_time_estimation() {
estimates_.clear();
estimate_times_.clear();
current_wave_id_ = rng_();
send_next_packet(1);
receive_next_packet();
aggregate_results_.expires_after(timeout_sec(cfg_->time_probe_max_rtt() + cfg_->time_probe_interval()*cfg_->time_probe_count()));
aggregate_results_.async_wait(lslboost::bind(&time_receiver::result_aggregation_scheduled,this,placeholders::error));
next_estimate_.expires_after(timeout_sec(cfg_->time_update_interval()));
next_estimate_.async_wait(lslboost::bind(&time_receiver::next_estimate_scheduled,this,placeholders::error));
}
void time_receiver::next_estimate_scheduled(error_code err) {
if (err != error::operation_aborted)
start_time_estimation();
}
void time_receiver::send_next_packet(int packet_num) {
try {
std::ostringstream request; request.precision(16); request << "LSL:timedata\r\n" << current_wave_id_ << " " << lsl_clock() << "\r\n";
string_p msg_buffer(new std::string(request.str()));
time_sock_.async_send_to(lslboost::asio::buffer(*msg_buffer), conn_.get_udp_endpoint(),
lslboost::bind(&time_receiver::handle_send_outcome,this,msg_buffer,placeholders::error));
} catch(std::exception &e) {
std::cerr << "Error trying to send a time packet: " << e.what() << std::endl;
}
if (packet_num < cfg_->time_probe_count()) {
next_packet_.expires_after(timeout_sec(cfg_->time_probe_interval()));
next_packet_.async_wait(lslboost::bind(&time_receiver::next_packet_scheduled,this,++packet_num,placeholders::error));
}
}
void time_receiver::handle_send_outcome(string_p, error_code) { }
void time_receiver::next_packet_scheduled(int packet_num, error_code err) {
if (!err)
send_next_packet(packet_num);
}
void time_receiver::receive_next_packet() {
time_sock_.async_receive_from(lslboost::asio::buffer(recv_buffer_), remote_endpoint_,
lslboost::bind(&time_receiver::handle_receive_outcome, this, placeholders::error, placeholders::bytes_transferred));
}
void time_receiver::handle_receive_outcome(error_code err, std::size_t len) {
try {
if (!err) {
std::istringstream is(std::string(recv_buffer_,len));
int wave_id; is >> wave_id;
if (wave_id == current_wave_id_) {
double t0, t1, t2, t3 = lsl_clock();
is >> t0 >> t1 >> t2;
double rtt = (t3-t0) - (t2-t1); double offset = ((t1-t0) + (t2-t3)) / 2; estimates_.push_back(std::make_pair(rtt,offset));
estimate_times_.push_back(std::make_pair((t3 + t0)/2.0, (t2 + t1)/2.0)); }
}
} catch(std::exception &e) {
std::cerr << "Error while processing a time estimation return packet: " << e.what() << std::endl;
}
if (err != error::operation_aborted)
receive_next_packet();
}
void time_receiver::result_aggregation_scheduled(error_code err) {
if (!err) {
if ((int)estimates_.size() >= cfg_->time_update_minprobes()) {
double best_offset=0, best_rtt=FOREVER;
double best_remote_time=0;
for (std::size_t k = 0; k < estimates_.size(); k++) {
if (estimates_[k].first < best_rtt) {
best_rtt = estimates_[k].first;
best_offset = estimates_[k].second;
best_remote_time = estimate_times_[k].second;
}
}
{
lslboost::lock_guard<lslboost::mutex> lock(timeoffset_mut_);
uncertainty_ = best_rtt;
timeoffset_ = -best_offset;
remote_time_ = best_remote_time;
}
timeoffset_upd_.notify_all();
}
}
}
bool time_receiver::timeoffset_available() { return (timeoffset_ != std::numeric_limits<double>::max()) || conn_.lost(); }
void time_receiver::reset_timeoffset_on_recovery() {
lslboost::lock_guard<lslboost::mutex> lock(timeoffset_mut_);
if (timeoffset_ != NOT_ASSIGNED)
was_reset_ = true;
timeoffset_ = NOT_ASSIGNED;
}