1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#ifndef RESOLVER_IMPL_H
#define RESOLVER_IMPL_H
#include "common.h"
#include "forward.h"
#include "stream_info_impl.h"
#include "cancellation.h"
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread_only.hpp>
using lslboost::asio::ip::udp;
using lslboost::asio::ip::tcp;
using lslboost::system::error_code;
namespace lsl {
class api_config;
/// A container for resolve results (map from stream instance UID onto (stream_info,receive-time)).
typedef std::map<std::string,std::pair<stream_info_impl,double> > result_container;
/**
* A stream resolver object.
* Maintains the necessary resources for a resolve process,
* used by the free-standing resolve functions, the continuous_resolver class, and the inlets.
*
* A resolver instance can be operated in two different ways:
* 1) In one shot: The resolver is queried one or more times by calling resolve_oneshot().
* 2) Continuously: First a background query process is started that keeps updating a results list
* by calling resolve_continuous() and the list is retrieved in parallel when desired using
* results(). In this case a new resolver instance must be created to issue a new query.
*/
class resolver_impl: public cancellable_registry, public lslboost::noncopyable {
public:
/**
* Instantiate a new resolver and configure timing parameters.
* A note on resolution logic. If KnownPeers in the api_config is empty, a new multicast wave will be scheduled every mcast_min_rtt
* (until a timeout expires or the desired number of streams has been resolved).If KnownPeers is non-empty, a multicast wave and a
* unicast wave will be schedule in alternation. The spacing between waves will be no shorter than the respective minimum RTTs.
* In continuous mode a special set of timings that is somewhat more lax is used (see API config).
*/
resolver_impl();
/// Destructor.
/// Cancels any ongoing processes and waits until they finish.
~resolver_impl();
/**
* Resolve a query string into a list of matching stream_info's on the network.
* Blocks until at least the minimum number of streams has been resolved, or the timeout fires, or the resolve has been cancelled.
* @param query The query string to send (usually a set of conditions on the properties of the stream info that should be searched,
* for example "name='BioSemi' and type='EEG'" (without the outer ""). See stream_info_impl::matches_query() for the
* definition of a query.
* @param minimum The minimum number of unique streams that should be resolved before this function may to return.
* @param timeout The timeout after which this function is forced to return (even if it did not produce the desired number of results).
* @param minimum_time Search for matching streams for at least this much time (e.g., if multiple streams may be present).
*/
std::vector<stream_info_impl> resolve_oneshot(const std::string &query, int minimum=0, double timeout=FOREVER, double minimum_time=0.0);
/**
* Starts a background thread that resolves a query string and periodically updates the list of present streams.
* After this, the resolver can *not* be repurposed for other queries or for oneshot operation (a new instance needs to be created for that).
* @param query The query string to send (usually a set of conditions on the properties of the stream info that should be searched,
* for example "name='BioSemi' and type='EEG'" (without the outer ""). See stream_info_impl::matches_query() for the
* definition of a query.
* @param forget_after If a stream vanishes from the network (e.g., because it was shut down), it will be pruned from the
* list this many seconds after it was last seen.
* @param prune_interval Prune dead streams from the list every this many seconds.
*/
void resolve_continuous(const std::string &query, double forget_after=5.0);
/// Get the current set of results (e.g., during continuous operation).
std::vector<stream_info_impl> results();
/// Tear down any ongoing operations and render the resolver unusable.
/// This can be used to cancel a blocking resolve_oneshot() from another thread (e.g., to initiate teardown of the object).
void cancel();
private:
/// This function starts a new wave of resolves.
void next_resolve_wave();
/// Start a new resolver attempt on the multicast hosts.
void udp_multicast_burst();
/// Start a new resolver attempt on the known peers.
void udp_unicast_burst(error_code err);
/// This handler is called when the overall timeout (if any) expires.
void resolve_timeout_expired(error_code err);
/// This handler is called when the wave timeout (if any) expires.
void wave_timeout_expired(error_code err);
/// Cancel the currently ongoing resolve, if any.
void cancel_ongoing_resolve();
// constants (mostly config-deduced)
const api_config *cfg_; // pointer to our configuration object
std::vector<udp> udp_protocols_; // UDP protocols under consideration
std::vector<tcp> tcp_protocols_; // TCP protocols under consideration
std::vector<udp::endpoint> mcast_endpoints_; // the list of multicast endpoints under consideration
std::vector<udp::endpoint> ucast_endpoints_; // the list of per-host UDP endpoints under consideration
// things related to cancellation
bool cancelled_; // if set, no more resolves can be started (destructively cancelled).
bool expired_; // if set, ongoing operations will finished quickly
// reinitialized for each query
std::string query_; // our current query string
int minimum_; // the minimum number of results that we want
double forget_after_; // forget results that are older than this (continuous operation only)
double wait_until_; // wait until this point in time before returning results (optional to allow for returning potentially more than a minimum number of results)
bool fast_mode_; // whether this is a fast resolve: determines the rate at which the query is repeated
result_container results_; // results are stored here
lslboost::mutex results_mut_; // a mutex that protects the results map
// io objects
io_context_p io_; // our IO service
lslboost::shared_ptr<lslboost::thread> background_io_;// a thread that runs background IO if we are performing a resolve_continuous
lslboost::asio::steady_timer resolve_timeout_expired_; // the overall timeout for a query
lslboost::asio::steady_timer wave_timer_; // a timer that fires when a new wave should be scheduled
lslboost::asio::steady_timer unicast_timer_; // a timer that fires when the unicast wave should be scheduled
};
}
#endif