#include "stream_outlet_impl.h"
#include <boost/bind.hpp>
#include <boost/thread/thread_only.hpp>
#include "tcp_server.h"
#include "udp_server.h"
using namespace lsl;
using namespace lslboost::asio;
stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int chunk_size, int max_capacity): chunk_size_(chunk_size), info_(new stream_info_impl(info)),
sample_factory_(new factory(info.channel_format(),info.channel_count(),info.nominal_srate()?info.nominal_srate()*api_config::get_instance()->outlet_buffer_reserve_ms()/1000:api_config::get_instance()->outlet_buffer_reserve_samples())), send_buffer_(new send_buffer(max_capacity))
{
ensure_lsl_initialized();
const api_config *cfg = api_config::get_instance();
if (cfg->allow_ipv4()) try {
instantiate_stack(tcp::v4(), udp::v4());
} catch (std::exception &e) {
std::cerr << "Could not instantiate IPv4 stack: " << e.what() << std::endl;
}
if (cfg->allow_ipv6()) try {
instantiate_stack(tcp::v6(), udp::v6());
} catch (std::exception &e) {
std::cerr << "Could not instantiate IPv6 stack: " << e.what() << std::endl;
}
if (tcp_servers_.empty() || udp_servers_.empty())
throw std::runtime_error("Neither the IPv4 nor the IPv6 stack could be instantiated.");
for (std::size_t k=0;k<tcp_servers_.size();k++)
tcp_servers_[k]->begin_serving();
for (std::size_t k = 0; k < udp_servers_.size(); k++)
udp_servers_[k]->begin_serving();
for (std::size_t k = 0; k < responders_.size(); k++)
responders_[k]->begin_serving();
for (std::size_t k = 0; k < ios_.size(); k++)
io_threads_.push_back(thread_p(new lslboost::thread(lslboost::bind(&stream_outlet_impl::run_io,this,ios_[k]))));
}
void stream_outlet_impl::instantiate_stack(tcp tcp_protocol, udp udp_protocol) {
const api_config *cfg = api_config::get_instance();
std::string listen_address = cfg->listen_address();
std::vector<std::string> multicast_addrs = cfg->multicast_addresses();
int multicast_ttl = cfg->multicast_ttl();
uint16_t multicast_port = cfg->multicast_port();
ios_.push_back(io_context_p(new io_context()));
tcp_servers_.push_back(tcp_server_p(new tcp_server(info_, ios_.back(), send_buffer_, sample_factory_, tcp_protocol, chunk_size_)));
ios_.push_back(io_context_p(new io_context()));
udp_servers_.push_back(udp_server_p(new udp_server(info_, *ios_.back(), udp_protocol)));
for (std::vector<std::string>::iterator i=multicast_addrs.begin(); i != multicast_addrs.end(); i++) {
try {
ip::address address(ip::make_address(*i));
if (udp_protocol == udp::v4() ? address.is_v4() : address.is_v6())
responders_.push_back(udp_server_p(new udp_server(info_, *ios_.back(), *i, multicast_port, multicast_ttl, listen_address)));
} catch(std::exception &e) {
std::clog << "Note (minor): could not create multicast responder for address " << *i << " (failed with: " << e.what() << ")" << std::endl;
}
}
}
stream_outlet_impl::~stream_outlet_impl() {
try {
for (std::size_t k=0;k<tcp_servers_.size();k++)
tcp_servers_[k]->end_serving();
for (std::size_t k=0;k<udp_servers_.size();k++)
udp_servers_[k]->end_serving();
for (std::size_t k=0;k<responders_.size();k++)
responders_[k]->end_serving();
for (std::size_t k=0;k<io_threads_.size();k++)
if (!io_threads_[k]->try_join_for(lslboost::chrono::milliseconds(1000))) {
std::cerr << "Tearing down stream_outlet of thread " << io_threads_[k]->get_id() << " (in id: " << lslboost::this_thread::get_id() << "): " << std::endl;
ios_[k]->stop();
for (int attempt=1; !io_threads_[k]->try_join_for(lslboost::chrono::milliseconds(1000)); attempt++) {
std::cerr << "Trying to kill stream_outlet (attempt #" << attempt << ")..." << std::endl;
io_threads_[k]->interrupt();
}
}
}
catch(std::exception &e) {
std::cerr << "Unexpected error during destruction of a stream outlet (id: " << lslboost::this_thread::get_id() << "): " << e.what() << std::endl;
}
catch(...) {
std::cerr << "Severe error during stream outlet shutdown." << std::endl;
}
}
void stream_outlet_impl::run_io(io_context_p &ios) {
while (true) {
try {
ios->run();
return;
} catch(std::exception &e) {
std::cerr << "Error during io_context processing (id: " << lslboost::this_thread::get_id() << "): " << e.what() << std::endl;
}
}
}
const stream_info_impl &stream_outlet_impl::info() const { return *info_; }
bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); }