1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#ifndef INLET_CONNECTION_H
#define INLET_CONNECTION_H
#include <map>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/thread_only.hpp>
#include <boost/function.hpp>
#include "resolver_impl.h"
#include "cancellation.h"
namespace lslboost { template<class Fn> class function; }
using lslboost::asio::ip::tcp;
using lslboost::asio::ip::udp;
namespace lsl {
/**
* An inlet connection encapsulates the inlet's connection endpoint (for the other components of the inlet)
* and provides a recovery mechanism in case the connection breaks down (e.g., due to a remote computer crash).
*
* When a client of the connection (one of the other inlet components) experiences a connection loss it invokes the
* function try_recover_from_error() which attempts to update the endpoint to a valid state (possible once the stream is back online).
* Since in some cases a client might not be able to detect a connection loss and so would stall forever, the inlet_connection
* maintains a watchdog thread that periodically checks and recovers the connection state. Internally the recovery works by
* using the resolver to find the desired stream on the network again and updating the endpoint information if it has changed.
*/
class inlet_connection: public cancellable_registry {
public:
/**
* Construct a new inlet connection.
* @param info A stream info object (either coming from one of the resolver functions or constructed manually).
* @param recover Try to silently recover lost streams that are recoverable (= those that that have a unique source_id set).
* In all other cases (recover is false or the stream is not recoverable) the stream is declared lost in case of
* a connection breakdown.
*/
inlet_connection(const stream_info_impl &info, bool recover=true);
/// Prepare the connection and its auto-recovery thread.
/// This is called once by the inlet after all other initialization.
void engage();
/// Shut down the connection and all its running operations.
/// This is called once by the inlet before any component is destroyed to
/// ensure orderly shutdown.
void disengage();
// === endpoint information (might change if the connection is rehosted) ===
/// Get the current TCP endpoint from the info (according to our configured protocol).
tcp::endpoint get_tcp_endpoint();
/// Get the current UDP endpoint from the info (according to our configured protocol).
udp::endpoint get_udp_endpoint();
/// Get the current hostname from the info.
std::string get_hostname();
/// Get the TCP protocol type.
tcp tcp_protocol() const { return tcp_protocol_; }
/// Get the UDP protocol type.
udp udp_protocol() const { return udp_protocol_; }
// === connection status info ===
/// True if the connection has been irretrievably lost.
bool lost() const { return lost_; }
/// True if the connection is being shut down.
bool shutdown() const { return shutdown_; }
// === recovery control ===
/// Try to recover the connection.
/// This either blocks until it succeeds or declares the connection as lost (if recovery is disabled),
/// and throws a lost error. Only call this when the connection is found to have broken down
/// (e.g., socket error).
void try_recover_from_error();
// === client status info ===
/// Indicate that a transmission is now active and requesting the watchdog.
/// The recovery watchdog will be inactive while no transmission is requested.
void acquire_watchdog();
/// Indicate that a transmission has just completed.
/// The recovery watchdog will be inactive while no transmission is requested.
void release_watchdog();
/// Inform the connection that content was received from the source (using lsl::lsl_clock()).
/// If a sufficient amount of time has passed since the last call the watchdog thread will
/// try to recover the connection.
void update_receive_time(double t);
/// Register a condition variable that should be notified when a connection is lost
void register_onlost(void *id, lslboost::condition_variable *cond);
/// Unregister a condition variable from the set that is notified on connection loss
void unregister_onlost(void *id);
/// Register a callback function that shall be called when a recovery has been performed
void register_onrecover(void *id, const lslboost::function<void()> &func);
/// Unregister a recovery callback function
void unregister_onrecover(void *id);
// === misc properties of the connected stream ===
/// Get info about the connected stream's type/format.
/// This information is constant and will never change over the lifetime of the connection.
const stream_info_impl &type_info() const { return type_info_; }
/// Get the current stream instance UID (which would be different after a crash and restart of the data source).
std::string current_uid();
/// Get the nominal srate of the endpoint; we assume that this might possibly change between crashes/restarts
/// of the data source under some circumstances (although such behavior would be strongly discouraged).
double current_srate();
private:
/// A thread that periodically checks whether the connection should be recovered.
void watchdog_thread();
/// A (potentially speculative) resolve-and-recover operation.
void try_recover();
// core connection properties
const stream_info_impl type_info_; // static/read-only information of the stream (type & format)
stream_info_impl host_info_; // the volatile information of the stream (addresses and ports); protected by a read/write mutex
lslboost::shared_mutex host_info_mut_; // a mutex to protect the state of the host_info (single-write/multiple-reader)
tcp tcp_protocol_; // the TCP protocol used (according to api_config)
udp udp_protocol_; // the UDP protocol used (according to api_config)
bool recovery_enabled_; // whether we would try to recover the stream if it is lost
bool lost_; // whether the stream is irrecoverably lost (set by try_recover_from_error if recovery is disabled)
// internal watchdog thread (to detect dead connections)
lslboost::thread watchdog_thread_; // re-resolves the current connection speculatively
// things related to the shutdown condition
bool shutdown_; // indicates to threads that we're shutting down
lslboost::mutex shutdown_mut_; // a mutex to protect the shutdown state
lslboost::condition_variable shutdown_cond_; // condition variable indicating that we're shutting down
// things related to recovery
resolver_impl resolver_; // our resolver, in case we need it
lslboost::mutex recovery_mut_; // we allow only one recovery operation at a time
// client status info for recovery & notification purposes
std::map<void*,lslboost::condition_variable*> onlost_; // a group of condition variables that should be notified when the connection is lost
std::map<void*,lslboost::function<void()> > onrecover_; // a group of callback functions that should be invoked once the connection has been recovered
double last_receive_time_; // the last time when we received data from the server
int active_transmissions_; // the number of currently active transmissions (data or info)
lslboost::mutex client_status_mut_; // protects the client status info
lslboost::mutex onrecover_mut_; // protects the onrecover callback map
};
}
#endif