#include <algorithm>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <memory>
#include <set>
#include <string>
#include "asio/buffer.hpp"
#include "asio/io_context.hpp"
#include "asio/ip/tcp.hpp"
#include "asio/ip/udp.hpp"
#include "asio/read_until.hpp"
#include "asio/steady_timer.hpp"
#include "asio/write.hpp"
using asio::steady_timer;
using asio::ip::tcp;
using asio::ip::udp;
class subscriber
{
public:
virtual ~subscriber() = default;
virtual void deliver(const std::string& msg) = 0;
};
typedef std::shared_ptr<subscriber> subscriber_ptr;
class channel
{
public:
void join(subscriber_ptr subscriber)
{
subscribers_.insert(subscriber);
}
void leave(subscriber_ptr subscriber)
{
subscribers_.erase(subscriber);
}
void deliver(const std::string& msg)
{
for (const auto& s : subscribers_)
{
s->deliver(msg);
}
}
private:
std::set<subscriber_ptr> subscribers_;
};
class tcp_session
: public subscriber,
public std::enable_shared_from_this<tcp_session>
{
public:
tcp_session(tcp::socket socket, channel& ch)
: channel_(ch),
socket_(std::move(socket))
{
input_deadline_.expires_at(steady_timer::time_point::max());
output_deadline_.expires_at(steady_timer::time_point::max());
non_empty_output_queue_.expires_at(steady_timer::time_point::max());
}
void start()
{
channel_.join(shared_from_this());
read_line();
check_deadline(input_deadline_);
await_output();
check_deadline(output_deadline_);
}
private:
void stop()
{
channel_.leave(shared_from_this());
std::error_code ignored_error;
socket_.close(ignored_error);
input_deadline_.cancel();
non_empty_output_queue_.cancel();
output_deadline_.cancel();
}
bool stopped() const
{
return !socket_.is_open();
}
void deliver(const std::string& msg) override
{
output_queue_.push_back(msg + "\n");
non_empty_output_queue_.expires_at(steady_timer::time_point::min());
}
void read_line()
{
input_deadline_.expires_after(std::chrono::seconds(30));
auto self(shared_from_this());
asio::async_read_until(socket_,
asio::dynamic_buffer(input_buffer_), '\n',
[this, self](const std::error_code& error, std::size_t n)
{
if (stopped())
return;
if (!error)
{
std::string msg(input_buffer_.substr(0, n - 1));
input_buffer_.erase(0, n);
if (!msg.empty())
{
channel_.deliver(msg);
}
else
{
if (output_queue_.empty())
{
output_queue_.push_back("\n");
non_empty_output_queue_.expires_at(
steady_timer::time_point::min());
}
}
read_line();
}
else
{
stop();
}
});
}
void await_output()
{
auto self(shared_from_this());
non_empty_output_queue_.async_wait(
[this, self](const std::error_code& )
{
if (stopped())
return;
if (output_queue_.empty())
{
non_empty_output_queue_.expires_at(steady_timer::time_point::max());
await_output();
}
else
{
write_line();
}
});
}
void write_line()
{
output_deadline_.expires_after(std::chrono::seconds(30));
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(output_queue_.front()),
[this, self](const std::error_code& error, std::size_t )
{
if (stopped())
return;
if (!error)
{
output_queue_.pop_front();
await_output();
}
else
{
stop();
}
});
}
void check_deadline(steady_timer& deadline)
{
auto self(shared_from_this());
deadline.async_wait(
[this, self, &deadline](const std::error_code& )
{
if (stopped())
return;
if (deadline.expiry() <= steady_timer::clock_type::now())
{
stop();
}
else
{
check_deadline(deadline);
}
});
}
channel& channel_;
tcp::socket socket_;
std::string input_buffer_;
steady_timer input_deadline_{socket_.get_executor()};
std::deque<std::string> output_queue_;
steady_timer non_empty_output_queue_{socket_.get_executor()};
steady_timer output_deadline_{socket_.get_executor()};
};
typedef std::shared_ptr<tcp_session> tcp_session_ptr;
class udp_broadcaster
: public subscriber
{
public:
udp_broadcaster(asio::io_context& io_context,
const udp::endpoint& broadcast_endpoint)
: socket_(io_context)
{
socket_.connect(broadcast_endpoint);
socket_.set_option(udp::socket::broadcast(true));
}
private:
void deliver(const std::string& msg)
{
std::error_code ignored_error;
socket_.send(asio::buffer(msg), 0, ignored_error);
}
udp::socket socket_;
};
class server
{
public:
server(asio::io_context& io_context,
const tcp::endpoint& listen_endpoint,
const udp::endpoint& broadcast_endpoint)
: io_context_(io_context),
acceptor_(io_context, listen_endpoint)
{
channel_.join(
std::make_shared<udp_broadcaster>(
io_context_, broadcast_endpoint));
accept();
}
private:
void accept()
{
acceptor_.async_accept(
[this](const std::error_code& error, tcp::socket socket)
{
if (!error)
{
std::make_shared<tcp_session>(std::move(socket), channel_)->start();
}
accept();
});
}
asio::io_context& io_context_;
tcp::acceptor acceptor_;
channel channel_;
};
int main(int argc, char* argv[])
{
try
{
using namespace std;
if (argc != 4)
{
std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
return 1;
}
asio::io_context io_context;
tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
udp::endpoint broadcast_endpoint(
asio::ip::make_address(argv[2]), atoi(argv[3]));
server s(io_context, listen_endpoint, broadcast_endpoint);
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}