#include "consumer_queue.h"
#include "send_buffer.h"
#include <boost/bind.hpp>
using namespace lsl;
send_buffer::send_buffer(int max_capacity): max_capacity_(max_capacity) {}
consumer_queue_p send_buffer::new_consumer(int max_buffered) {
max_buffered = max_buffered ? std::min(max_buffered,max_capacity_) : max_capacity_;
return consumer_queue_p(new consumer_queue(max_buffered, shared_from_this()));
}
void send_buffer::push_sample(const sample_p &s) {
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
for (consumer_set::iterator i=consumers_.begin(); i != consumers_.end(); i++)
(*i)->push_sample(s);
}
void send_buffer::register_consumer(consumer_queue *q) {
{
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
consumers_.insert(q);
}
some_registered_.notify_all();
}
void send_buffer::unregister_consumer(consumer_queue *q) {
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
consumers_.erase(q);
}
bool send_buffer::have_consumers() {
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
return some_registered();
}
bool send_buffer::wait_for_consumers(double timeout) {
lslboost::unique_lock<lslboost::mutex> lock(consumers_mut_);
return some_registered_.wait_for(lock, lslboost::chrono::duration<double>(timeout), lslboost::bind(&send_buffer::some_registered,this));
}