1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#ifndef SEND_BUFFER_H
#define SEND_BUFFER_H
#include <boost/container/flat_set.hpp>
#include <boost/smart_ptr/enable_shared_from_this.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include "common.h"
#include "forward.h"
namespace lsl {
/**
* A thread-safe single-producer multiple-consumer queue where each consumer gets every pushed sample.
* If the bounded capacity is exhausted, the oldest samples will be erased.
*
* Note that the send_buffer is actually just a dispatcher that distributes the data to producer-consumer
* queues (each of which can have its own capacity preferences). The ownership of the send_buffer is shared
* between the consumer_queues and the owner of the send_buffer.
*/
class send_buffer: public lslboost::enable_shared_from_this<send_buffer> {
typedef lslboost::container::flat_set<consumer_queue*> consumer_set;
public:
/**
* Create a new send buffer.
* @param max_capacity Hard upper bound on queue capacity beyond which the oldest samples will be dropped.
*/
send_buffer(int max_capacity);
/**
* Add a new consumer queue to the buffer.
* Each consumer will get all samples (although the oldest samples will be dropped when the buffer capacity is overrun).
* The consumer is automatically removed upon destruction.
* @param max_buffered If non-zero, the queue size for this consumer will be constrained to be no larger than this value.
* Note that the actual queue size will never exceed the max_capacity of the send_buffer (so this is
* a global limit).
* @return Shared pointer to the newly created consumer.
*/
consumer_queue_p new_consumer(int max_buffered=0);
/**
* Push a sample onto the send buffer.
* Will subsequently be received by all consumers.
*/
void push_sample(const sample_p &s);
/// Wait until some consumers are present.
bool wait_for_consumers(double timeout=FOREVER);
/// Check whether any consumer is currently registered.
bool have_consumers();
private:
friend class consumer_queue;
/// Registered a new consumer (called by the consumer_queue).
void register_consumer(consumer_queue *q);
/// Unregister a previously registered consumer (called by the consumer_queue).
void unregister_consumer(consumer_queue *q);
/// wait_for_consumers is waiting for this
bool some_registered() const { return !consumers_.empty(); }
int max_capacity_; // maximum capacity beyond which the oldest samples will be dropped
consumer_set consumers_; // a set of registered consumer queues
lslboost::mutex consumers_mut_; // mutex to protect the integrity of consumers_
lslboost::condition_variable some_registered_; // condition variable signaling that a consumer has registered
};
}
#endif