#ifndef CANCELLABLE_STREAMBUF_HPP
#define CANCELLABLE_STREAMBUF_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif
#include "cancellation.h"
#include <streambuf>
#include <exception>
#include <set>
#include <boost/asio/detail/config.hpp>
#include <boost/utility/base_from_member.hpp>
#include <boost/asio/basic_socket.hpp>
#include <boost/asio/detail/array.hpp>
#include <boost/asio/detail/throw_error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/stream_socket_service.hpp>
#include <boost/bind/bind.hpp>
using lslboost::asio::ip::tcp;
using lslboost::asio::io_context;
using lslboost::asio::basic_socket;
namespace lsl {
typedef tcp Protocol;
class cancellable_streambuf: public std::streambuf, private lslboost::base_from_member<io_context>, public basic_socket<Protocol>, public lsl::cancellable_obj {
public:
cancellable_streambuf(): basic_socket<Protocol>(lslboost::base_from_member<lslboost::asio::io_context>::member), cancel_issued_(false), cancel_started_(false) {
init_buffers();
}
virtual ~cancellable_streambuf() {
unregister_from_all();
if (pptr() != pbase())
overflow(traits_type::eof());
}
void cancel() {
cancel_issued_ = true;
lslboost::lock_guard<lslboost::recursive_mutex> lock(cancel_mut_);
cancel_started_ = false;
this->get_service().get_io_context().post(lslboost::bind(&cancellable_streambuf::close_if_open,this));
}
cancellable_streambuf* connect(const Protocol::endpoint& endpoint) {
{
lslboost::lock_guard<lslboost::recursive_mutex> lock(cancel_mut_);
if (cancel_issued_)
throw std::runtime_error("Attempt to connect() a cancellable_streambuf after it has been cancelled.");
init_buffers();
this->basic_socket<Protocol>::close(ec_);
io_handler handler = { this };
this->basic_socket<Protocol>::async_connect(endpoint, handler);
this->get_service().get_io_context().reset();
}
ec_ = lslboost::asio::error::would_block;
do this->get_service().get_io_context().run_one();
while (!cancel_issued_ && ec_ == lslboost::asio::error::would_block);
return !ec_ ? this : 0;
}
cancellable_streambuf* close() {
sync();
this->basic_socket<Protocol>::close(ec_);
if (!ec_)
init_buffers();
return !ec_ ? this : 0;
}
const lslboost::system::error_code& puberror() const { return error(); }
protected:
void close_if_open() {
if (!cancel_started_ && this->is_open()) {
cancel_started_ = true;
close();
}
}
void protected_reset() {
lslboost::lock_guard<lslboost::recursive_mutex> lock(cancel_mut_);
if (cancel_issued_)
close_if_open();
this->get_service().get_io_context().reset();
}
int_type underflow() {
if (gptr() == egptr()) {
io_handler handler = { this };
this->get_service().async_receive(this->get_implementation(),
lslboost::asio::buffer(lslboost::asio::buffer(get_buffer_) + putback_max),
0, handler);
ec_ = lslboost::asio::error::would_block;
protected_reset(); do this->get_service().get_io_context().run_one();
while (ec_ == lslboost::asio::error::would_block);
if (ec_)
return traits_type::eof();
setg(&get_buffer_[0], &get_buffer_[0] + putback_max,
&get_buffer_[0] + putback_max + bytes_transferred_);
return traits_type::to_int_type(*gptr());
}
else
return traits_type::eof();
}
int_type overflow(int_type c) {
lslboost::asio::const_buffer buffer =
lslboost::asio::buffer(pbase(), pptr() - pbase());
while (lslboost::asio::buffer_size(buffer) > 0) {
io_handler handler = { this };
this->get_service().async_send(this->get_implementation(),
lslboost::asio::buffer(buffer), 0, handler);
ec_ = lslboost::asio::error::would_block;
protected_reset(); do this->get_service().get_io_context().run_one();
while (ec_ == lslboost::asio::error::would_block);
if (ec_)
return traits_type::eof();
buffer = buffer + bytes_transferred_;
}
setp(&put_buffer_[0], &put_buffer_[0] + put_buffer_.size());
if (traits_type::eq_int_type(c, traits_type::eof()))
return traits_type::not_eof(c);
*pptr() = traits_type::to_char_type(c);
pbump(1);
return c;
}
int sync() { return overflow(traits_type::eof()); }
std::streambuf* setbuf(char_type*, std::streamsize) {
return 0;
}
virtual const lslboost::system::error_code& error() const { return ec_; }
void init_buffers() {
setg(&get_buffer_[0],
&get_buffer_[0] + putback_max,
&get_buffer_[0] + putback_max);
setp(&put_buffer_[0], &put_buffer_[0] + put_buffer_.size());
}
struct io_handler;
friend struct io_handler;
struct io_handler {
cancellable_streambuf* this_;
void operator()(const lslboost::system::error_code& ec, std::size_t bytes_transferred = 0) {
this_->ec_ = ec;
this_->bytes_transferred_ = bytes_transferred;
}
};
enum { putback_max = 8 };
enum { buffer_size = 512 };
lslboost::asio::detail::array<char, buffer_size> get_buffer_;
lslboost::asio::detail::array<char, buffer_size> put_buffer_;
lslboost::system::error_code ec_;
std::size_t bytes_transferred_;
bool cancel_issued_;
bool cancel_started_;
lslboost::recursive_mutex cancel_mut_;
};
}
#endif