#ifndef STREAM_OUTLET_IMPL_H
#define STREAM_OUTLET_IMPL_H
#include <iostream>
#include "forward.h"
#include "send_buffer.h"
#include "stream_info_impl.h"
#include "common.h"
#include "api_config.h"
#include "sample.h"
using lslboost::asio::ip::tcp;
using lslboost::asio::ip::udp;
namespace lsl {
typedef lslboost::shared_ptr<lslboost::thread> thread_p;
class stream_outlet_impl: public lslboost::noncopyable {
public:
stream_outlet_impl(const stream_info_impl &info, int32_t chunk_size=0, int32_t max_capacity=512000);
~stream_outlet_impl();
void push_sample(const std::vector<float> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const std::vector<double> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const std::vector<long> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const std::vector<int32_t> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const std::vector<int16_t> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const std::vector<char> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const std::vector<std::string> &data, double timestamp=0.0, bool pushthrough=true) { check_numchan((int32_t)data.size()); enqueue(&data[0],timestamp,pushthrough); }
void push_sample(const float *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
void push_sample(const double *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
void push_sample(const long *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
void push_sample(const int32_t *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
void push_sample(const int16_t *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
void push_sample(const char *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
void push_sample(const std::string *data, double timestamp=0.0, bool pushthrough=true) { enqueue(data,timestamp,pushthrough); }
template <typename T>
inline lsl_error_code_t push_sample_noexcept(const T* data, double timestamp = 0.0,
bool pushthrough = true) BOOST_NOEXCEPT {
try {
enqueue(data, timestamp, pushthrough);
return lsl_no_error;
} catch (std::range_error& e) {
std::cerr << "Error during push_sample: " << e.what() << std::endl;
return lsl_argument_error;
} catch (std::invalid_argument& e) {
std::cerr << "Error during push_sample: " << e.what() << std::endl;
return lsl_argument_error;
} catch (std::exception& e) {
std::cerr << "Unexpected error during push_sample: " << e.what() << std::endl;
return lsl_internal_error;
}
}
void push_numeric_raw(const void *data, double timestamp=0.0, bool pushthrough=true) {
if (lsl::api_config::get_instance()->force_default_timestamps())
timestamp = 0.0;
sample_p smp(sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
smp->assign_untyped(data);
send_buffer_->push_sample(smp);
}
template<class T> void push_chunk_multiplexed(const T *data_buffer, const double *timestamp_buffer, std::size_t data_buffer_elements, bool pushthrough=true) {
std::size_t num_chans = info().channel_count(), num_samples = data_buffer_elements/num_chans;
if (data_buffer_elements % num_chans != 0)
throw std::runtime_error("The number of buffer elements to send is not a multiple of the stream's channel count.");
if (!data_buffer)
throw std::runtime_error("The data buffer pointer must not be NULL.");
if (!timestamp_buffer)
throw std::runtime_error("The timestamp buffer pointer must not be NULL.");
for (std::size_t k=0; k<num_samples; k++)
enqueue(&data_buffer[k*num_chans],timestamp_buffer[k],pushthrough && k==num_samples-1);
}
template<class T> int32_t push_chunk_multiplexed_noexcept(const T *data_buffer, const double *timestamp_buffer, std::size_t data_buffer_elements, bool pushthrough=true) BOOST_NOEXCEPT {
try {
push_chunk_multiplexed(data_buffer, timestamp_buffer, data_buffer_elements, pushthrough);
return lsl_no_error;
}
catch(std::range_error &e) {
std::cerr << "Error during push_chunk: " << e.what() << std::endl;
return lsl_argument_error;
}
catch(std::invalid_argument &e) {
std::cerr << "Error during push_chunk: " << e.what() << std::endl;
return lsl_argument_error;
}
catch(std::exception &e) {
std::cerr << "Unexpected error during push_chunk: " << e.what() << std::endl;
return lsl_internal_error;
}
}
template<class T> void push_chunk_multiplexed(const T *buffer, std::size_t buffer_elements, double timestamp=0.0, bool pushthrough=true) {
std::size_t num_chans = info().channel_count(), num_samples = buffer_elements/num_chans;
if (buffer_elements % num_chans != 0)
throw std::runtime_error("The number of buffer elements to send is not a multiple of the stream's channel count.");
if (!buffer)
throw std::runtime_error("The number of buffer elements to send is not a multiple of the stream's channel count.");
if (num_samples > 0) {
if (timestamp == 0.0)
timestamp = lsl_clock();
if (info().nominal_srate() != IRREGULAR_RATE)
timestamp = timestamp - (num_samples-1)/info().nominal_srate();
push_sample(buffer,timestamp,pushthrough && (num_samples==1));
for (std::size_t k=1; k<num_samples; k++)
push_sample(&buffer[k*num_chans],DEDUCED_TIMESTAMP,pushthrough && (k==num_samples-1));
}
}
template<class T> int32_t push_chunk_multiplexed_noexcept(const T *data, std::size_t data_elements, double timestamp=0.0, bool pushthrough=true) BOOST_NOEXCEPT {
try {
push_chunk_multiplexed(data, data_elements, timestamp, pushthrough);
return lsl_no_error;
}
catch(std::range_error &e) {
std::cerr << "Error during push_chunk: " << e.what() << std::endl;
return lsl_argument_error;
}
catch(std::invalid_argument &e) {
std::cerr << "Error during push_chunk: " << e.what() << std::endl;
return lsl_argument_error;
}
catch(std::exception &e) {
std::cerr << "Unexpected error during push_chunk: " << e.what() << std::endl;
return lsl_internal_error;
}
}
const stream_info_impl &info() const;
bool have_consumers();
bool wait_for_consumers(double timeout=FOREVER);
private:
void instantiate_stack(tcp tcp_protocol, udp udp_protocol);
void run_io(io_context_p &ios);
template<class T> void enqueue(T* data, double timestamp, bool pushthrough) {
if (lsl::api_config::get_instance()->force_default_timestamps())
timestamp = 0.0;
sample_p smp(sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
smp->assign_typed(data);
send_buffer_->push_sample(smp);
}
void check_numchan(int32_t chns) {
if (chns != info_->channel_count())
throw std::range_error("The provided sample data has a different length than the stream's number of channels.");
}
factory_p sample_factory_; int32_t chunk_size_; stream_info_impl_p info_; send_buffer_p send_buffer_; std::vector<io_context_p> ios_;
std::vector<tcp_server_p> tcp_servers_; std::vector<udp_server_p> udp_servers_; std::vector<udp_server_p> responders_; std::vector<thread_p> io_threads_; };
}
#endif