1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#ifndef RESOLVE_ATTEMPT_UDP_H
#define RESOLVE_ATTEMPT_UDP_H
#include "stream_info_impl.h"
#include "cancellation.h"
#include <map>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/smart_ptr/enable_shared_from_this.hpp>
namespace lslboost { class mutex; }
namespace lslboost { namespace system { class error_code; } }
using lslboost::asio::ip::udp;
using lslboost::system::error_code;
namespace lsl {
/// A container for resolve results (map from stream instance UID onto (stream_info,receive-time)).
typedef std::map<std::string,std::pair<stream_info_impl,double> > result_container;
/**
* An asynchronous resolve attempt for a single query targeted at a set of endpoints, via UDP.
* A resolve attempt is an asynchronous operation submitted to an IO object, which amounts
* to a sequence of query packet sends (one for each endpoint in the list) and a sequence of
* result packet receives. The operation will wait for return packets until either a particular
* timeout has been reached or until it is cancelled via the cancel() method.
*/
class resolve_attempt_udp: public cancellable_obj, public lslboost::enable_shared_from_this<resolve_attempt_udp> {
typedef std::vector<udp::endpoint> endpoint_list;
public:
/**
* Instantiate and set up a new resolve attempt.
* @param io The io_context that will run the async operations.
* @param protocol The protocol (either udp::v4() or udp::v6()) to use for communications;
* only the subset of target addresses matching this protocol will be considered.
* @param targets A list of udp::endpoint that should be targeted by this query.
* @param query The query string to send (usually a set of conditions on the properties of the stream info that should be searched,
* for example "name='BioSemi' and type='EEG'" (without the outer ""). See stream_info_impl::matches_query() for the
* definition of a query.
* @param results Reference to a container into which results are stored; potentially shared with other parallel resolve operations.
* Since this is not thread-safe all operations modifying this must run on the same single-threaded IO service.
* @param results_mut Reference to a mutex that protects the container.
* @param cancel_after The time duration after which the attempt is automatically cancelled, i.e. the receives are ended.
* @param registry A registry where the attempt can register itself as active so it can be cancelled during shutdown.
*/
resolve_attempt_udp(lslboost::asio::io_context &io, const udp &protocol, const std::vector<udp::endpoint> &targets, const std::string &query, result_container &results, lslboost::mutex &results_mut, double cancel_after=5.0, cancellable_registry *registry=NULL);
/// Destructor
~resolve_attempt_udp();
/// Start the attempt asynchronously.
void begin();
/// Cancel operations asynchronously, and destructively.
/// Note that this mostly serves to expedite the destruction of the object,
/// which would happen anyway after some time.
/// As the attempt instance is owned by the handler chains
/// the cancellation eventually leads to the destruction of the object.
void cancel();
private:
// === send and receive handlers ===
/// This function asks to receive the next result packet.
void receive_next_result();
/// Thos function starts an async send operation for the given current endpoint.
void send_next_query(endpoint_list::const_iterator i);
/// Handler that gets called when a receive has completed.
void handle_receive_outcome(error_code err, std::size_t len);
/// Handler that gets called when a send has completed.
void handle_send_outcome(endpoint_list::const_iterator i, error_code err);
// === cancellation ===
/// Handler that gets called when the give up timeout has expired.
void handle_timeout(error_code err);
/// Cancel the outstanding operations.
void do_cancel();
// data shared with the resolver_impl
lslboost::asio::io_context &io_; // reference to the IO service that executes our actions
result_container &results_; // shared result container
lslboost::mutex &results_mut_; // shared mutex that protects the results
// constant over the lifetime of this attempt
double cancel_after_; // the timeout for giving up
bool cancelled_; // whether the operation has been cancelled
std::vector<udp::endpoint> targets_;// list of endpoints that should receive the query
std::string query_; // the query string
std::string query_msg_; // the query message that we're sending
std::string query_id_; // the (more or less) unique id for this query
// data maintained/modified across handler invocations
udp::endpoint remote_endpoint_; // the endpoint from which we received the last result
char resultbuf_[65536]; // holds a single result received from the net
// IO objects
udp::socket unicast_socket_; // socket to send data over (for unicasts)
udp::socket broadcast_socket_; // socket to send data over (for broadcasts)
udp::socket multicast_socket_; // socket to send data over (for multicasts)
udp::socket recv_socket_; // socket to receive replies (always unicast)
lslboost::asio::steady_timer cancel_timer_; // timer to schedule the cancel action
};
}
#endif