1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#ifndef TIME_RECEIVER_H
#define TIME_RECEIVER_H
#include <limits>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread_only.hpp>
#include <boost/random/mersenne_twister.hpp>
using lslboost::asio::ip::udp;
using lslboost::asio::steady_timer;
using lslboost::system::error_code;
namespace lsl {
class inlet_connection;
class api_config;
/// list of time estimates with error bounds
typedef std::vector<std::pair<double,double> > estimate_list;
/// pointer to a string
typedef lslboost::shared_ptr<std::string> string_p;
/// internally used constant to represent an unassigned time offset
const double NOT_ASSIGNED = std::numeric_limits<double>::max();
/// Internal class of an inlet that is responsible for retrieving the time-correction data of the inlet.
/// The actual communication runs in an internal background thread, while the public function (time_correction()) waits for the thread to finish.
/// The public function has an optional timeout after which it gives up, while the background thread
/// continues to do its job (so the next public-function call may succeed within the timeout).
/// The background thread terminates only if the time_receiver is destroyed or the underlying connection is lost or shut down.
class time_receiver {
public:
/// Construct a new time receiver for a given connection.
time_receiver(inlet_connection &conn);
/// Destructor. Stops the background activities.
~time_receiver();
/**
* Retrieve an estimated time correction offset for the given stream.
* The first call to this function takes several msec for an initial estimate, subsequent calls are instantaneous.
* The correction offset is periodically re-estimated in the background (once every few sec.).
* @remote_time Time of this measurment on remote computer
* @uncertainty Maximum uncertainty of this measurement (maps to round-trip-time).
* @timeout Timeout for first time-correction estimate.
* @return The time correction estimate.
* @throws timeout_error If the initial estimate times out.
*/
double time_correction(double timeout=2);
double time_correction(double *remote_time, double *uncertainty, double timeout);
/// Determine whether the clock was (potentially) reset since the last call to was_reset()
/// This can happen if the stream got lost (e.g., app crash) and the computer got restarted or swapped out
bool was_reset();
private:
/// The time reader / updater thread.
void time_thread();
/// Start a new multi-packet exchange for time estimation
void start_time_estimation();
/// Handler that gets called once the next time estimation shall be scheduled
void next_estimate_scheduled(error_code err);
/// Send the next packet in an exchange
void send_next_packet(int packet_num);
/// Handler that gets called once the sending of a packet has completed
void handle_send_outcome(string_p msg_buffer, error_code err);
/// Handler that gets called when the next packet shall be scheduled
void next_packet_scheduled(int packet_num, error_code err);
/// Request reception of the next time packet
void receive_next_packet();
/// Handler that gets called once reception of a time packet has completed
void handle_receive_outcome(error_code err, std::size_t len);
/// Handlers that gets called once the time estimation results shall be aggregated.
void result_aggregation_scheduled(error_code err);
/// Function polled by the condition variable
bool timeoffset_available();
/// Ensures that the time-offset is reset when the underlying connection is recovered (e.g., switches to another host)
void reset_timeoffset_on_recovery();
// the underlying connection
inlet_connection &conn_; // our connection
// background reader thread and the data generated by it
lslboost::thread time_thread_; // updates time offset
bool was_reset_; // whether the clock was reset
double timeoffset_; // the current time offset (or NOT_ASSIGNED if not yet assigned)
double remote_time_; // remote computer time at the specified timeoffset_
double uncertainty_; // round trip time (a.k.a. uncertainty) at the specficied timeoffset_
lslboost::mutex timeoffset_mut_; // mutex to protect the time offset
lslboost::condition_variable timeoffset_upd_; // condition variable to indicate that an update for the time offset is available
// data used internally by the background thread
const api_config *cfg_; // the configuration object
lslboost::asio::io_context time_io_; // an IO service for async time operations
char recv_buffer_[16384]; // a buffer to hold inbound packet contents
lslboost::random::mt19937 rng_; // a random number generator
udp::socket time_sock_; // the socket through which the time thread communicates
steady_timer next_estimate_; // schedule the next time estimate
steady_timer aggregate_results_; // schedules result aggregation
steady_timer next_packet_; // schedules the next packet transfer
udp::endpoint remote_endpoint_; // a dummy endpoint
estimate_list estimates_; // a vector of time estimates collected so far during the current exchange
estimate_list estimate_times_; // a vector of the local time and the remote time at a given estimate
int current_wave_id_; // an id for the current wave of time packets
};
}
#endif