#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <stdexcept>
#include <iterator>
#include <map>
#include <srt.h>
#if !defined(_WIN32)
#include <sys/ioctl.h>
#endif
#include "netinet_any.h"
#include "common.h"
#include "api.h"
#include "udt.h"
#include "logging.h"
#include "utilities.h"
#include "apputil.hpp"
#include "socketoptions.hpp"
#include "uriparser.hpp"
#include "testmedia.hpp"
#include "srt_compat.h"
#include "verbose.hpp"
using namespace std;
using srt_logging::SockStatusStr;
#if ENABLE_EXPERIMENTAL_BONDING
using srt_logging::MemberStatusStr;
#endif
volatile bool transmit_throw_on_interrupt = false;
int transmit_bw_report = 0;
unsigned transmit_stats_report = 0;
size_t transmit_chunk_size = SRT_LIVE_DEF_PLSIZE;
bool transmit_printformat_json = false;
srt_listen_callback_fn* transmit_accept_hook_fn = nullptr;
void* transmit_accept_hook_op = nullptr;
bool transmit_use_sourcetime = false;
std::shared_ptr<SrtStatsWriter> transmit_stats_writer;
string DirectionName(SRT_EPOLL_T direction)
{
string dir_name;
if (direction & ~SRT_EPOLL_ERR)
{
if (direction & SRT_EPOLL_IN)
{
dir_name = "source";
}
if (direction & SRT_EPOLL_OUT)
{
if (!dir_name.empty())
dir_name = "relay";
else
dir_name = "target";
}
if (direction & SRT_EPOLL_ERR)
{
dir_name += "+error";
}
}
else
{
dir_name = "stone";
}
return dir_name;
}
template<class FileBase> inline
bytevector FileRead(FileBase& ifile, size_t chunk, const string& filename)
{
bytevector data(chunk);
ifile.read(data.data(), chunk);
size_t nread = ifile.gcount();
if (nread < data.size())
data.resize(nread);
if (data.empty())
throw Source::ReadEOF(filename);
return data;
}
class FileSource: public virtual Source
{
ifstream ifile;
string filename_copy;
public:
FileSource(const string& path): ifile(path, ios::in | ios::binary), filename_copy(path)
{
if (!ifile)
throw std::runtime_error(path + ": Can't open file for reading");
}
MediaPacket Read(size_t chunk) override { return FileRead(ifile, chunk, filename_copy); }
bool IsOpen() override { return bool(ifile); }
bool End() override { return ifile.eof(); }
};
#ifdef PLEASE_LOG
#include "logging.h"
#endif
class FileTarget: public virtual Target
{
ofstream ofile;
public:
FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}
void Write(const MediaPacket& data) override
{
ofile.write(data.payload.data(), data.payload.size());
#ifdef PLEASE_LOG
extern srt_logging::Logger applog;
applog.Debug() << "FileTarget::Write: " << data.size() << " written to a file";
#endif
}
bool IsOpen() override { return !!ofile; }
bool Broken() override { return !ofile.good(); }
void Close() override
{
#ifdef PLEASE_LOG
extern srt_logging::Logger applog;
applog.Debug() << "FileTarget::Close";
#endif
ofile.close();
}
};
class FileRelay: public Relay
{
fstream iofile;
string filename_copy;
public:
FileRelay(const string& path):
iofile(path, ios::in | ios::out | ios::binary), filename_copy(path)
{
if (!iofile)
throw std::runtime_error(path + ": Can't open file for reading");
}
MediaPacket Read(size_t chunk) override { return FileRead(iofile, chunk, filename_copy); }
void Write(const MediaPacket& data) override
{
iofile.write(data.payload.data(), data.payload.size());
}
bool IsOpen() override { return !!iofile; }
bool End() override { return iofile.eof(); }
bool Broken() override { return !iofile.good(); }
void Close() override { iofile.close(); }
};
template <class Iface> struct File;
template <> struct File<Source> { typedef FileSource type; };
template <> struct File<Target> { typedef FileTarget type; };
template <> struct File<Relay> { typedef FileRelay type; };
template <class Iface>
Iface* CreateFile(const string& name) { return new typename File<Iface>::type (name); }
void SrtCommon::InitParameters(string host, string path, map<string,string> par)
{
if ( Verbose::on && !par.empty())
{
Verb() << "SRT parameters specified:\n";
for (map<string,string>::iterator i = par.begin(); i != par.end(); ++i)
{
Verb() << "\t" << i->first << " = '" << i->second << "'\n";
}
}
if (path != "")
{
if (path.substr(0, 2) != "//")
{
Error("Path specification not supported for SRT (use // in front for special cases)");
}
path = path.substr(2);
#if ENABLE_EXPERIMENTAL_BONDING
if (path == "group")
{
m_group_type = par["type"];
if (m_group_type == "")
{
Error("With //group, the group 'type' must be specified.");
}
vector<string> parts;
Split(m_group_type, '/', back_inserter(parts));
if (parts.size() == 0 || parts.size() > 2)
{
Error("Invalid specification for 'type' parameter");
}
if (parts.size() == 2)
{
m_group_type = parts[0];
m_group_config = parts[1];
}
vector<string> nodes;
Split(par["nodes"], ',', back_inserter(nodes));
if (nodes.empty())
{
Error("With //group, 'nodes' must specify comma-separated host:port specs.");
}
int token = 1;
for (string& hostport: nodes)
{
if (hostport == "")
continue;
size_t atq = hostport.find('?');
if (atq != string::npos)
{
while (atq+1 < hostport.size())
{
size_t next = hostport.find('?', atq+1);
if (next == string::npos)
break;
hostport[next] = '&';
atq = next;
}
}
UriParser check(hostport, UriParser::EXPECT_HOST);
if (check.host() == "" || check.port() == "")
{
Error("With //group, 'nodes' must specify comma-separated host:port specs.");
}
if (check.portno() <= 1024)
{
Error("With //group, every node in 'nodes' must have port >1024");
}
Connection cc(check.host(), check.portno());
if (check.parameters().count("weight"))
{
cc.weight = stoi(check.queryValue("weight"));
}
if (check.parameters().count("source"))
{
UriParser sourcehp(check.queryValue("source"), UriParser::EXPECT_HOST);
cc.source = CreateAddr(sourcehp.host(), sourcehp.portno());
}
UriParser::query_it start = check.parameters().lower_bound("srto.");
SRT_SOCKOPT_CONFIG* config = nullptr;
bool all_clear = true;
vector<string> fails;
map<string, string> options;
if (start != check.parameters().end())
{
for (; start != check.parameters().end(); ++start)
{
auto& y = *start;
if (y.first.substr(0, 5) != "srto.")
break;
options[y.first.substr(5)] = y.second;
}
}
if (!options.empty())
{
config = srt_create_config();
for (auto o: srt_options)
{
if (!options.count(o.name))
continue;
string value = options.at(o.name);
bool ok = o.apply<SocketOption::SRT>(config, value);
if ( !ok )
{
fails.push_back(o.name);
all_clear = false;
}
}
if (!all_clear)
{
srt_delete_config(config);
Error("With //group, failed to set options: " + Printable(fails));
}
cc.options = config;
}
cc.token = token++;
m_group_nodes.push_back(move(cc));
}
par.erase("type");
par.erase("nodes");
par["mode"] = "caller";
}
#endif
}
string adapter;
if (par.count("adapter"))
{
adapter = par.at("adapter");
}
m_mode = "default";
if (par.count("mode"))
{
m_mode = par.at("mode");
}
SocketOption::Mode mode = SrtInterpretMode(m_mode, host, adapter);
if (mode == SocketOption::FAILURE)
{
Error("Invalid mode");
}
if (!m_group_nodes.empty() && mode != SocketOption::CALLER)
{
Error("Group node specification is only available in caller mode");
}
m_mode = SocketOption::mode_names[mode];
par.erase("mode");
if (par.count("blocking"))
{
m_blocking_mode = !false_names.count(par.at("blocking"));
par.erase("blocking");
}
if (par.count("timeout"))
{
m_timeout = stoi(par.at("timeout"), 0, 0);
par.erase("timeout");
}
if (par.count("adapter"))
{
m_adapter = adapter;
par.erase("adapter");
}
else if (m_mode == "listener")
{
m_adapter = host;
}
if (par.count("tsbpd") && false_names.count(par.at("tsbpd")))
{
m_tsbpdmode = false;
}
if (par.count("port"))
{
m_outgoing_port = stoi(par.at("port"), 0, 0);
par.erase("port");
}
if (par.count("transtype") == 0 || par["transtype"] != "file")
{
if (transmit_chunk_size != SRT_LIVE_DEF_PLSIZE)
{
if (transmit_chunk_size > SRT_LIVE_MAX_PLSIZE)
throw std::runtime_error("Chunk size in live mode exceeds 1456 bytes; this is not supported");
par["payloadsize"] = Sprint(transmit_chunk_size);
}
}
if (par.count("groupconfig"))
{
m_group_config = par.at("groupconfig");
par.erase("groupconfig");
}
m_options = par;
m_options["mode"] = m_mode;
}
void SrtCommon::PrepareListener(string host, int port, int backlog)
{
m_bindsock = srt_create_socket();
if (m_bindsock == SRT_ERROR)
Error("srt_create_socket");
int stat = ConfigurePre(m_bindsock);
if (stat == SRT_ERROR)
Error("ConfigurePre");
if (!m_blocking_mode)
{
srt_conn_epoll = AddPoller(m_bindsock, SRT_EPOLL_OUT);
}
auto sa = CreateAddr(host, port);
Verb() << "Binding a server on " << host << ":" << port << " ...";
stat = srt_bind(m_bindsock, sa.get(), sizeof sa);
if (stat == SRT_ERROR)
{
srt_close(m_bindsock);
Error("srt_bind");
}
Verb() << " listen... " << VerbNoEOL;
stat = srt_listen(m_bindsock, backlog);
if (stat == SRT_ERROR)
{
srt_close(m_bindsock);
Error("srt_listen");
}
}
void SrtCommon::StealFrom(SrtCommon& src)
{
m_direction = src.m_direction;
m_blocking_mode = src.m_blocking_mode;
m_timeout = src.m_timeout;
m_tsbpdmode = src.m_tsbpdmode;
m_options = src.m_options;
m_bindsock = SRT_INVALID_SOCK; m_sock = src.m_sock;
src.m_sock = SRT_INVALID_SOCK; }
void SrtCommon::AcceptNewClient()
{
sockaddr_any scl;
::transmit_throw_on_interrupt = true;
if (!m_blocking_mode)
{
Verb() << "[ASYNC] (conn=" << srt_conn_epoll << ")";
int len = 2;
SRTSOCKET ready[2];
if (srt_epoll_wait(srt_conn_epoll, 0, 0, ready, &len, -1, 0, 0, 0, 0) == -1)
Error("srt_epoll_wait(srt_conn_epoll)");
Verb() << "[EPOLL: " << len << " sockets] " << VerbNoEOL;
}
Verb() << " accept..." << VerbNoEOL;
m_sock = srt_accept(m_bindsock, (scl.get()), (&scl.len));
if (m_sock == SRT_INVALID_SOCK)
{
srt_close(m_bindsock);
m_bindsock = SRT_INVALID_SOCK;
Error("srt_accept");
}
#if ENABLE_EXPERIMENTAL_BONDING
if (m_sock & SRTGROUP_MASK)
{
m_listener_group = true;
if (m_group_config != "")
{
int stat = srt_group_configure(m_sock, m_group_config.c_str());
if (stat == SRT_ERROR)
{
Verb() << " (error setting config: '" << m_group_config << "') " << VerbNoEOL;
}
}
#ifndef SRT_OLD_APP_READER
if (srt_epoll != -1)
{
Verb() << "(Group: erasing epoll " << srt_epoll << ") " << VerbNoEOL;
srt_epoll_release(srt_epoll);
}
srt_epoll = srt_epoll_create();
#endif
if (m_group_data.empty())
m_group_data.resize(1);
Verb() << " connected(group epoll " << srt_epoll <<").";
}
else
#endif
{
sockaddr_any peeraddr(AF_INET6);
string peer = "<?PEER?>";
if (-1 != srt_getpeername(m_sock, (peeraddr.get()), (&peeraddr.len)))
{
peer = peeraddr.str();
}
sockaddr_any agentaddr(AF_INET6);
string agent = "<?AGENT?>";
if (-1 != srt_getsockname(m_sock, (agentaddr.get()), (&agentaddr.len)))
{
agent = agentaddr.str();
}
Verb() << " connected [" << agent << "] <-- " << peer;
}
::transmit_throw_on_interrupt = false;
int stat = ConfigurePost(m_sock);
if (stat == SRT_ERROR)
Error("ConfigurePost");
}
static string PrintEpollEvent(int events, int et_events)
{
static pair<int, const char*> const namemap [] = {
make_pair(SRT_EPOLL_IN, "R"),
make_pair(SRT_EPOLL_OUT, "W"),
make_pair(SRT_EPOLL_ERR, "E"),
make_pair(SRT_EPOLL_UPDATE, "U")
};
ostringstream os;
int N = Size(namemap);
for (int i = 0; i < N; ++i)
{
if (events & namemap[i].first)
{
os << "[";
if (et_events & namemap[i].first)
os << "^";
os << namemap[i].second << "]";
}
}
return os.str();
}
void SrtCommon::Init(string host, int port, string path, map<string,string> par, SRT_EPOLL_OPT dir)
{
m_direction = dir;
InitParameters(host, path, par);
int backlog = 1;
if (m_mode == "listener" && par.count("groupconnect")
&& true_names.count(par["groupconnect"]))
{
backlog = 10;
}
Verb() << "Opening SRT " << DirectionName(dir) << " " << m_mode
<< "(" << (m_blocking_mode ? "" : "non-") << "blocking,"
<< " backlog=" << backlog << ") on "
<< host << ":" << port;
try
{
if (m_mode == "caller")
{
if (m_group_nodes.empty())
{
OpenClient(host, port);
}
#if ENABLE_EXPERIMENTAL_BONDING
else
{
OpenGroupClient(); }
#endif
}
else if (m_mode == "listener")
OpenServer(m_adapter, port, backlog);
else if (m_mode == "rendezvous")
OpenRendezvous(m_adapter, host, port);
else
{
throw std::invalid_argument("Invalid 'mode'. Use 'client' or 'server'");
}
}
catch (...)
{
Verb() << "Open FAILED - closing SRT sockets";
if (m_bindsock != SRT_INVALID_SOCK)
srt_close(m_bindsock);
if (m_sock != SRT_INVALID_SOCK)
srt_close(m_sock);
m_sock = m_bindsock = SRT_INVALID_SOCK;
throw;
}
int pbkeylen = 0;
SRT_KM_STATE kmstate, snd_kmstate, rcv_kmstate;
int len = sizeof (int);
srt_getsockflag(m_sock, SRTO_PBKEYLEN, &pbkeylen, &len);
srt_getsockflag(m_sock, SRTO_KMSTATE, &kmstate, &len);
srt_getsockflag(m_sock, SRTO_SNDKMSTATE, &snd_kmstate, &len);
srt_getsockflag(m_sock, SRTO_RCVKMSTATE, &rcv_kmstate, &len);
std::string KmStateStr(SRT_KM_STATE state);
Verb() << "ENCRYPTION status: " << KmStateStr(kmstate)
<< " (SND:" << KmStateStr(snd_kmstate) << " RCV:" << KmStateStr(rcv_kmstate)
<< ") PBKEYLEN=" << pbkeylen;
if (Verbose::on)
{
int64_t bandwidth = 0;
int latency = 0;
bool blocking_snd = false, blocking_rcv = false;
int dropdelay = 0;
int size_int = sizeof (int), size_int64 = sizeof (int64_t), size_bool = sizeof (bool);
srt_getsockflag(m_sock, SRTO_MAXBW, &bandwidth, &size_int64);
srt_getsockflag(m_sock, SRTO_RCVLATENCY, &latency, &size_int);
srt_getsockflag(m_sock, SRTO_RCVSYN, &blocking_rcv, &size_bool);
srt_getsockflag(m_sock, SRTO_SNDSYN, &blocking_snd, &size_bool);
srt_getsockflag(m_sock, SRTO_SNDDROPDELAY, &dropdelay, &size_int);
Verb() << "OPTIONS: maxbw=" << bandwidth << " rcvlatency=" << latency << boolalpha
<< " blocking{rcv=" << blocking_rcv << " snd=" << blocking_snd
<< "} snddropdelay=" << dropdelay;
}
if (!m_blocking_mode)
{
if (m_mode == "caller")
dir = (dir | SRT_EPOLL_UPDATE);
Verb() << "NON-BLOCKING MODE - SUB FOR " << PrintEpollEvent(dir, 0);
srt_epoll = AddPoller(m_sock, dir);
}
}
int SrtCommon::AddPoller(SRTSOCKET socket, int modes)
{
int pollid = srt_epoll_create();
if (pollid == -1)
throw std::runtime_error("Can't create epoll in nonblocking mode");
Verb() << "EPOLL: creating eid=" << pollid << " and adding @" << socket
<< " in " << DirectionName(SRT_EPOLL_OPT(modes)) << " mode";
srt_epoll_add_usock(pollid, socket, &modes);
return pollid;
}
int SrtCommon::ConfigurePost(SRTSOCKET sock)
{
bool yes = m_blocking_mode;
int result = 0;
if (m_direction & SRT_EPOLL_OUT)
{
Verb() << "Setting SND blocking mode: " << boolalpha << yes << " timeout=" << m_timeout;
result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &yes, sizeof yes);
if (result == -1)
{
#ifdef PLEASE_LOG
extern srt_logging::Logger applog;
applog.Error() << "ERROR SETTING OPTION: SRTO_SNDSYN";
#endif
return result;
}
if (m_timeout)
result = srt_setsockopt(sock, 0, SRTO_SNDTIMEO, &m_timeout, sizeof m_timeout);
if (result == -1)
{
#ifdef PLEASE_LOG
extern srt_logging::Logger applog;
applog.Error() << "ERROR SETTING OPTION: SRTO_SNDTIMEO";
#endif
return result;
}
}
if (m_direction & SRT_EPOLL_IN)
{
Verb() << "Setting RCV blocking mode: " << boolalpha << yes << " timeout=" << m_timeout;
result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &yes, sizeof yes);
if (result == -1)
return result;
if (m_timeout)
result = srt_setsockopt(sock, 0, SRTO_RCVTIMEO, &m_timeout, sizeof m_timeout);
if (result == -1)
return result;
}
vector<string> failures;
SrtConfigurePost(sock, m_options, &failures);
if (!failures.empty())
{
if (Verbose::on)
{
Verb() << "WARNING: failed to set options: ";
copy(failures.begin(), failures.end(), ostream_iterator<string>(*Verbose::cverb, ", "));
Verb();
}
}
return 0;
}
int SrtCommon::ConfigurePre(SRTSOCKET sock)
{
int result = 0;
int no = 0;
if (!m_tsbpdmode)
{
result = srt_setsockopt(sock, 0, SRTO_TSBPDMODE, &no, sizeof no);
if (result == -1)
return result;
}
int maybe = m_blocking_mode;
result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &maybe, sizeof maybe);
if (result == -1)
return result;
vector<string> failures;
SocketOption::Mode conmode = SrtConfigurePre(sock, "", m_options, &failures);
if (conmode == SocketOption::FAILURE)
{
if (Verbose::on )
{
Verb() << "WARNING: failed to set options: ";
copy(failures.begin(), failures.end(), ostream_iterator<string>(*Verbose::cverb, ", "));
Verb();
}
return SRT_ERROR;
}
return 0;
}
void SrtCommon::SetupAdapter(const string& host, int port)
{
auto lsa = CreateAddr(host, port);
int stat = srt_bind(m_sock, lsa.get(), sizeof lsa);
if (stat == SRT_ERROR)
Error("srt_bind");
}
void SrtCommon::OpenClient(string host, int port)
{
PrepareClient();
if (m_outgoing_port)
{
SetupAdapter("", m_outgoing_port);
}
ConnectClient(host, port);
}
void SrtCommon::PrepareClient()
{
m_sock = srt_create_socket();
if (m_sock == SRT_ERROR)
Error("srt_create_socket");
int stat = ConfigurePre(m_sock);
if (stat == SRT_ERROR)
Error("ConfigurePre");
if (!m_blocking_mode)
{
srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
}
}
#if ENABLE_EXPERIMENTAL_BONDING
void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* , int token)
{
SrtCommon* that = (SrtCommon*)srtcommon;
if (error == SRT_SUCCESS)
{
return; }
for (auto& n: that->m_group_nodes)
{
if (n.token != -1 && n.token == token)
{
n.error = error;
n.reason = srt_getrejectreason(sock);
return;
}
}
Verb() << " IPE: LINK NOT FOUND???]";
}
void SrtCommon::OpenGroupClient()
{
SRT_GROUP_TYPE type = SRT_GTYPE_UNDEFINED;
if (m_group_type == "broadcast")
type = SRT_GTYPE_BROADCAST;
else if (m_group_type == "backup")
type = SRT_GTYPE_BACKUP;
else if (m_group_type == "balancing")
type = SRT_GTYPE_BALANCING;
else
{
Error("With //group, type='" + m_group_type + "' undefined");
}
m_sock = srt_create_group(type);
if (m_sock == -1)
Error("srt_create_group");
srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this);
int stat = -1;
if (m_group_config != "")
{
stat = srt_group_configure(m_sock, m_group_config.c_str());
if (stat == SRT_ERROR)
Error("srt_group_configure");
}
stat = ConfigurePre(m_sock);
if ( stat == SRT_ERROR )
Error("ConfigurePre");
if (!m_blocking_mode)
{
srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
}
srt_epoll = srt_epoll_create();
bool any_node = false;
Verb() << "REDUNDANT connections with " << m_group_nodes.size() << " nodes:";
if (m_group_data.empty())
m_group_data.resize(1);
vector<SRT_SOCKGROUPCONFIG> targets;
int namelen = sizeof (sockaddr_any);
Verb() << "Connecting to nodes:";
int i = 1;
for (Connection& c: m_group_nodes)
{
auto sa = CreateAddr(c.host, c.port);
c.target = sa;
Verb() << "\t[" << c.token << "] " << c.host << ":" << c.port << VerbNoEOL;
vector<string> extras;
if (c.weight)
extras.push_back(Sprint("weight=", c.weight));
if (!c.source.empty())
extras.push_back("source=" + c.source.str());
if (!extras.empty())
{
Verb() << "?" << extras[0] << VerbNoEOL;
for (size_t i = 1; i < extras.size(); ++i)
Verb() << "&" << extras[i] << VerbNoEOL;
}
Verb();
++i;
const sockaddr* source = c.source.empty() ? nullptr : c.source.get();
SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, sa.get(), namelen);
gd.weight = c.weight;
gd.config = c.options;
targets.push_back(gd);
}
Verb() << "Waiting for group connection... " << VerbNoEOL;
int fisock = srt_connect_group(m_sock, targets.data(), targets.size());
if (fisock == SRT_ERROR)
{
ostringstream out;
for (Connection& c: m_group_nodes)
{
if (c.error != SRT_SUCCESS)
{
out << "[" << c.token << "] " << c.host << ":" << c.port;
if (!c.source.empty())
out << "[[" << c.source.str() << "]]";
out << ": " << srt_strerror(c.error, 0) << ": " << srt_rejectreason_str(c.reason) << endl;
}
}
Error("srt_connect_group, nodes:\n" + out.str());
}
if (m_blocking_mode)
{
Verb() << "SUCCESSFUL";
}
else
{
Verb() << "INITIATED [ASYNC]";
}
ConfigurePost(m_sock);
for (size_t i = 0; i < targets.size(); ++i)
{
if (targets[i].id != -1 && targets[i].errorcode == SRT_SUCCESS)
{
m_group_nodes[i].socket = targets[i].id;
}
}
size_t size = m_group_data.size();
stat = srt_group_data(m_sock, m_group_data.data(), &size);
if (stat == -1 && size > m_group_data.size())
{
m_group_data.resize(size);
stat = srt_group_data(m_sock, m_group_data.data(), &size);
}
if (stat == -1)
{
Error("srt_group_data");
}
m_group_data.resize(size);
for (size_t i = 0; i < m_group_nodes.size(); ++i)
{
SRTSOCKET insock = m_group_nodes[i].socket;
if (insock == -1)
{
Verb() << "TARGET '" << sockaddr_any(targets[i].peeraddr).str() << "' connection failed.";
continue;
}
any_node = true;
}
if (!any_node)
Error("All connections failed");
if (!m_blocking_mode)
{
Verb() << "[ASYNC] " << VerbNoEOL;
int len1 = 2, len2 = 2;
SRTSOCKET ready_conn[2], ready_err[2];
if (srt_epoll_wait(srt_conn_epoll,
ready_err, &len2,
ready_conn, &len1,
-1, NULL, NULL,
NULL, NULL) != -1)
{
if (find(ready_err, ready_err+len2, m_sock) != ready_err+len2)
{
Verb() << "[EPOLL: " << len2 << " entities FAILED]";
Error("All group connections failed", SRT_REJ_UNKNOWN, SRT_ENOCONN);
}
else if (find(ready_conn, ready_conn+len1, m_sock) != ready_conn+len1)
{
Verb() << "[EPOLL: " << len1 << " entities] " << VerbNoEOL;
}
else
{
Error("Group: SPURIOUS epoll readiness");
}
}
else
{
Error("srt_epoll_wait");
}
}
stat = ConfigurePost(m_sock);
if (stat == -1)
{
Error("ConfigurePost");
}
Verb() << "Group connection report:";
for (auto& d: m_group_data)
{
Verb() << "@" << d.id << " <" << SockStatusStr(d.sockstate) << "> (=" << d.result << ") PEER:"
<< sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
}
m_group_data.resize(m_group_nodes.size());
}
#endif
struct TransmitErrorReason
{
int error;
int reason;
};
static std::map<SRTSOCKET, TransmitErrorReason> transmit_error_storage;
static void TransmitConnectCallback(void*, SRTSOCKET socket, int errorcode, const sockaddr* , int )
{
int reason = srt_getrejectreason(socket);
transmit_error_storage[socket] = TransmitErrorReason { errorcode, reason };
Verb() << "[Connection error reported on @" << socket << "]";
}
void SrtCommon::ConnectClient(string host, int port)
{
auto sa = CreateAddr(host, port);
Verb() << "Connecting to " << host << ":" << port << " ... " << VerbNoEOL;
if (!m_blocking_mode)
{
srt_connect_callback(m_sock, &TransmitConnectCallback, 0);
}
int stat = srt_connect(m_sock, sa.get(), sizeof sa);
if (stat == SRT_ERROR)
{
int reason = srt_getrejectreason(m_sock);
#if PLEASE_LOG
extern srt_logging::Logger applog;
LOGP(applog.Error, "ERROR reported by srt_connect - closing socket @", m_sock);
#endif
srt_close(m_sock);
Error("srt_connect", reason);
}
if (!m_blocking_mode)
{
Verb() << "[ASYNC] " << VerbNoEOL;
int lenc = 2, lene = 2;
SRTSOCKET ready_connect[2], ready_error[2];
if (srt_epoll_wait(srt_conn_epoll, ready_error, &lene, ready_connect, &lenc, -1, 0, 0, 0, 0) != -1)
{
if (!transmit_error_storage.empty())
{
Verb() << "[CALLBACK(error): " << VerbNoEOL;
int error, reason;
bool failed = false;
for (pair<const SRTSOCKET, TransmitErrorReason>& e: transmit_error_storage)
{
Verb() << "{@" << e.first << " error=" << e.second.error
<< " reason=" << e.second.reason << "} " << VerbNoEOL;
error = e.second.error;
reason = e.second.reason;
if (error != SRT_SUCCESS)
failed = true;
}
Verb() << "]";
transmit_error_storage.clear();
if (failed)
Error("srt_connect(async/cb)", reason, error);
}
if (lene > 0)
{
Verb() << "[EPOLL(error): " << lene << " sockets]";
int reason = srt_getrejectreason(ready_error[0]);
Error("srt_connect(async)", reason, SRT_ECONNREJ);
}
Verb() << "[EPOLL: " << lenc << " sockets] " << VerbNoEOL;
}
else
{
transmit_error_storage.clear();
Error("srt_epoll_wait(srt_conn_epoll)");
}
transmit_error_storage.clear();
}
Verb() << " connected.";
stat = ConfigurePost(m_sock);
if (stat == SRT_ERROR)
Error("ConfigurePost");
}
void SrtCommon::Error(string src, int reason, int force_result)
{
int errnov = 0;
const int result = force_result == 0 ? srt_getlasterror(&errnov) : force_result;
if (result == SRT_SUCCESS)
{
cerr << "\nERROR (app): " << src << endl;
throw std::runtime_error(src);
}
string message = srt_strerror(result, errnov);
if (result == SRT_ECONNREJ)
{
if ( Verbose::on )
Verb() << "FAILURE\n" << src << ": [" << result << "] "
<< "Connection rejected: [" << int(reason) << "]: "
<< srt_rejectreason_str(reason);
else
cerr << "\nERROR #" << result
<< ": Connection rejected: [" << int(reason) << "]: "
<< srt_rejectreason_str(reason);
}
else
{
if ( Verbose::on )
Verb() << "FAILURE\n" << src << ": [" << result << "." << errnov << "] " << message;
else
cerr << "\nERROR #" << result << "." << errnov << ": " << message << endl;
}
throw TransmissionError("error: " + src + ": " + message);
}
void SrtCommon::SetupRendezvous(string adapter, string host, int port)
{
sockaddr_any target = CreateAddr(host, port);
if (target.family() == AF_UNSPEC)
{
Error("Unable to resolve target host: " + host);
}
bool yes = true;
srt_setsockopt(m_sock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes);
const int outport = m_outgoing_port ? m_outgoing_port : port;
auto localsa = CreateAddr(adapter, outport, target.family());
string showhost = adapter;
if (showhost == "")
showhost = "ANY";
if (target.family() == AF_INET6)
showhost = "[" + showhost + "]";
Verb() << "Binding rendezvous: " << showhost << ":" << outport << " ...";
int stat = srt_bind(m_sock, localsa.get(), localsa.size());
if (stat == SRT_ERROR)
{
srt_close(m_sock);
Error("srt_bind");
}
}
void SrtCommon::Close()
{
#if PLEASE_LOG
extern srt_logging::Logger applog;
LOGP(applog.Error, "CLOSE requested - closing socket @", m_sock);
#endif
bool any = false;
bool yes = true;
if (m_sock != SRT_INVALID_SOCK)
{
Verb() << "SrtCommon: DESTROYING CONNECTION, closing socket (rt%" << m_sock << ")...";
srt_setsockflag(m_sock, SRTO_SNDSYN, &yes, sizeof yes);
srt_close(m_sock);
any = true;
}
if (m_bindsock != SRT_INVALID_SOCK)
{
Verb() << "SrtCommon: DESTROYING SERVER, closing socket (ls%" << m_bindsock << ")...";
srt_setsockflag(m_bindsock, SRTO_SNDSYN, &yes, sizeof yes);
srt_close(m_bindsock);
any = true;
}
if (any)
Verb() << "SrtCommon: ... done.";
}
SrtCommon::~SrtCommon()
{
Close();
}
#if ENABLE_EXPERIMENTAL_BONDING
void SrtCommon::UpdateGroupStatus(const SRT_SOCKGROUPDATA* grpdata, size_t grpdata_size)
{
if (!grpdata)
{
cerr << "ERROR: broadcast group update reports " << grpdata_size
<< " existing sockets, but app registerred only " << m_group_nodes.size() << endl;
Error("Too many unpredicted sockets in the group");
}
const SRT_SOCKGROUPDATA* gend = grpdata + grpdata_size;
for (auto& n: m_group_nodes)
{
bool active = (find_if(grpdata, gend,
[&n] (const SRT_SOCKGROUPDATA& sg) { return sg.id == n.socket; }) != gend);
if (!active)
n.socket = SRT_INVALID_SOCK;
}
for (size_t i = 0; i < grpdata_size; ++i)
{
const SRT_SOCKGROUPDATA& d = grpdata[i];
SRTSOCKET id = d.id;
SRT_SOCKSTATUS status = d.sockstate;
int result = d.result;
SRT_MEMBERSTATUS mstatus = d.memberstate;
if (result != -1 && status == SRTS_CONNECTED)
{
Verb() << "G@" << id << "<" << MemberStatusStr(mstatus) << "> " << VerbNoEOL;
continue;
}
Verb() << "\n\tG@" << id << " <" << SockStatusStr(status) << "/" << MemberStatusStr(mstatus) << "> (=" << result << ") PEER:"
<< sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str() << VerbNoEOL;
if (status >= SRTS_BROKEN)
{
Verb() << "NOTE: socket @" << id << " is pending for destruction, waiting for it.";
}
}
int i = 1;
for (auto& n: m_group_nodes)
{
if (n.error != SRT_SUCCESS)
{
Verb() << "[" << i << "] CONNECTION FAILURE to '" << n.host << ":" << n.port << "': "
<< srt_strerror(n.error, 0) << ":" << srt_rejectreason_str(n.reason);
}
if (n.socket != SRT_INVALID_SOCK)
continue;
auto sa = CreateAddr(n.host, n.port);
Verb() << "[" << i << "] RECONNECTING to node " << n.host << ":" << n.port << " ... " << VerbNoEOL;
++i;
n.error = SRT_SUCCESS;
n.reason = SRT_REJ_UNKNOWN;
const sockaddr* source = n.source.empty() ? nullptr : n.source.get();
SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, sa.get(), sa.size());
gd.weight = n.weight;
gd.config = n.options;
gd.token = n.token;
int fisock = srt_connect_group(m_sock, &gd, 1);
if (fisock == SRT_ERROR)
{
Verb() << "FAILED: ";
}
else
{
n.socket = gd.id;
}
}
}
#endif
SrtSource::SrtSource(string host, int port, std::string path, const map<string,string>& par)
{
Init(host, port, path, par, SRT_EPOLL_IN);
ostringstream os;
os << host << ":" << port;
hostport_copy = os.str();
}
static void PrintSrtStats(SRTSOCKET sock, bool clr, bool bw, bool stats)
{
CBytePerfMon perf;
srt_bstats(sock, &perf, clr);
if (bw)
cout << transmit_stats_writer->WriteBandwidth(perf.mbpsBandwidth);
if (stats)
cout << transmit_stats_writer->WriteStats(sock, perf);
}
#ifdef SRT_OLD_APP_READER
bool SrtSource::GroupCheckPacketAhead(bytevector& output)
{
bool status = false;
vector<SRTSOCKET> past_ahead;
for (auto i = m_group_positions.begin(); i != m_group_positions.end(); ++i)
{
ReadPos& a = i->second;
int seqdiff = CSeqNo::seqcmp(a.sequence, m_group_seqno);
if ( seqdiff == 1)
{
m_group_seqno = a.sequence;
Verb() << " (SRT group: ahead delivery %" << a.sequence << " from @" << i->first << ")";
swap(output, a.packet);
status = true;
}
else if (seqdiff < 1 && !a.packet.empty())
{
Verb() << " (@" << i->first << " dropping collected ahead %" << a.sequence << ")";
a.packet.clear();
}
}
return status;
}
static string DisplayEpollResults(const std::set<SRTSOCKET>& sockset, std::string prefix)
{
typedef set<SRTSOCKET> fset_t;
ostringstream os;
os << prefix << " ";
for (fset_t::const_iterator i = sockset.begin(); i != sockset.end(); ++i)
{
os << "@" << *i << " ";
}
return os.str();
}
bytevector SrtSource::GroupRead(size_t chunk)
{
bytevector output;
set<SRTSOCKET> broken;
RETRY_READING:
size_t size = m_group_data.size();
int stat = srt_group_data(m_sock, m_group_data.data(), &size);
if (stat == -1 && size > m_group_data.size())
{
m_group_data.resize(size);
stat = srt_group_data(m_sock, m_group_data.data(), &size);
}
else
{
m_group_data.resize(size);
}
if (stat == -1) {
Error(UDT::getlasterror(), "FAILURE when reading group data");
}
if (size == 0)
{
Error("No sockets in the group - disconnected");
}
bool connected = false;
for (auto& d: m_group_data)
{
if (d.status == SRTS_CONNECTED)
{
connected = true;
break;
}
}
if (!connected)
{
Error("All sockets in the group disconnected");
}
if (Verbose::on)
{
for (auto& d: m_group_data)
{
if (d.status != SRTS_CONNECTED)
Verb() << "@" << d.id << " <" << SockStatusStr(d.status) << "> (=" << d.result << ") PEER:"
<< sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
}
}
if (m_group_seqno != -1 && !m_group_positions.empty())
{
bytevector ahead_packet;
if (GroupCheckPacketAhead(ahead_packet))
return move(ahead_packet);
}
Verb() << "E(" << srt_epoll << ") " << VerbNoEOL;
for (size_t i = 0; i < size; ++i)
{
SRT_SOCKGROUPDATA& d = m_group_data[i];
if (d.status == SRTS_CONNECTING)
{
Verb() << "@" << d.id << "<pending> " << VerbNoEOL;
int modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
srt_epoll_add_usock(srt_epoll, d.id, &modes);
continue; }
if (d.status >= SRTS_BROKEN)
{
broken.insert(d.id);
}
if (broken.count(d.id))
{
Verb() << "@" << d.id << "<broken> " << VerbNoEOL;
continue;
}
if (d.status != SRTS_CONNECTED)
{
Verb() << "@" << d.id << "<idle:" << SockStatusStr(d.status) << "> " << VerbNoEOL;
continue;
}
int modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
srt_epoll_add_usock(srt_epoll, d.id, &modes);
Verb() << "@" << d.id << "[READ] " << VerbNoEOL;
}
Verb() << "";
SrtPollState sready;
if (UDT::epoll_swait(srt_epoll, sready, -1) == SRT_ERROR)
{
Error(UDT::getlasterror(), "UDT::epoll_swait(srt_epoll, group)");
}
if (Verbose::on)
{
Verb() << "RDY: {"
<< DisplayEpollResults(sready.rd(), "[R]")
<< DisplayEpollResults(sready.wr(), "[W]")
<< DisplayEpollResults(sready.ex(), "[E]")
<< "} " << VerbNoEOL;
}
LOGC(applog.Debug, log << "epoll_swait: "
<< DisplayEpollResults(sready.rd(), "[R]")
<< DisplayEpollResults(sready.wr(), "[W]")
<< DisplayEpollResults(sready.ex(), "[E]"));
typedef set<SRTSOCKET> fset_t;
broken = sready.ex();
int32_t next_seq = m_group_seqno;
for (fset_t::const_iterator i = sready.rd().begin(); i != sready.rd().end(); ++i)
{
SRTSOCKET id = *i;
ReadPos* p = nullptr;
auto pe = m_group_positions.find(id);
if (pe != m_group_positions.end())
{
p = &pe->second;
int seqdiff = CSeqNo::seqcmp(p->sequence, m_group_seqno);
if (seqdiff > 1)
{
Verb() << "EPOLL: @" << id << " %" << p->sequence << " AHEAD, not reading.";
continue;
}
}
int fi = 1; for (;;)
{
bytevector data(chunk);
SRT_MSGCTRL mctrl = srt_msgctrl_default;
stat = srt_recvmsg2(id, data.data(), chunk, &mctrl);
if (stat == SRT_ERROR)
{
if (fi == 0)
{
if (Verbose::on)
{
if (p)
{
int32_t pktseq = p->sequence;
int seqdiff = CSeqNo::seqcmp(p->sequence, m_group_seqno);
Verb() << ". %" << pktseq << " " << seqdiff << ")";
}
else
{
Verb() << ".)";
}
}
fi = 1;
}
int err = srt_getlasterror(0);
if (err == SRT_EASYNCRCV)
{
break;
}
Verb() << "Error @" << id << ": " << srt_getlasterror_str();
broken.insert(id);
break;
}
if (m_group_seqno != -1 && abs(m_group_seqno - mctrl.pktseq) > CSeqNo::m_iSeqNoTH)
{
if (fi == 0)
{
Verb() << ".)";
fi = 1;
}
Verb() << "Error @" << id << ": SEQUENCE DISCREPANCY: base=%" << m_group_seqno << " vs pkt=%" << mctrl.pktseq << ", setting ESECFAIL";
broken.insert(id);
break;
}
if (!p)
{
p = &(m_group_positions[id] = ReadPos { mctrl.pktseq, {} });
}
else
{
p->sequence = mctrl.pktseq;
}
if (m_group_seqno != -1)
{
int seqdiff = CSeqNo::seqcmp(mctrl.pktseq, m_group_seqno);
if (seqdiff <= 0)
{
if (fi == 1)
{
Verb() << "(@" << id << " FLUSH:" << VerbNoEOL;
fi = 0;
}
Verb() << "." << VerbNoEOL;
continue;
}
if (fi == 0)
{
Verb() << ". %" << mctrl.pktseq << " " << (-seqdiff) << ")";
fi = 1;
}
if (seqdiff > 1)
{
Verb() << "@" << id << " %" << mctrl.pktseq << " AHEAD";
p->packet = move(data);
break; }
}
if (!output.empty())
{
Verb() << "@" << id << " %" << mctrl.pktseq << " REDUNDANT";
break;
}
Verb() << "@" << id << " %" << mctrl.pktseq << " DELIVERING";
output = move(data);
next_seq = mctrl.pktseq;
break;
}
}
if (broken.size() == size)
{
Error("All sockets broken");
}
if (Verbose::on && !broken.empty())
{
Verb() << "BROKEN: " << Printable(broken) << " - removing";
}
for (SRTSOCKET d: broken)
{
m_group_positions.erase(d);
srt_close(d);
}
broken.clear();
if (!output.empty())
{
if (next_seq == -1)
{
Error("IPE: next_seq not set after output extracted!");
}
m_group_seqno = next_seq;
return output;
}
if (!m_group_positions.empty())
{
set<SRTSOCKET> elephants;
pair<const SRTSOCKET, ReadPos>* slowest_kangaroo = nullptr;
for (auto& sock_rp: m_group_positions)
{
int seqdiff = CSeqNo::seqcmp(sock_rp.second.sequence, m_group_seqno);
if (seqdiff < 0)
{
elephants.insert(sock_rp.first);
}
else if (seqdiff > 0)
{
if (!slowest_kangaroo)
{
slowest_kangaroo = &sock_rp;
}
else
{
int seqdiff = CSeqNo::seqcmp(slowest_kangaroo->second.sequence, sock_rp.second.sequence);
if (seqdiff > 0)
{
slowest_kangaroo = &sock_rp;
}
}
}
}
if (slowest_kangaroo)
{
m_group_seqno = slowest_kangaroo->second.sequence;
Verb() << "@" << slowest_kangaroo->first << " %" << m_group_seqno << " KANGAROO->HORSE";
swap(output, slowest_kangaroo->second.packet);
return output;
}
if (Verbose::on)
{
if (!elephants.empty())
{
Verb() << "ALL LINKS ELEPHANTS. Re-polling.";
}
else
{
Verb() << "ONLY BROKEN WERE REPORTED. Re-polling.";
}
}
goto RETRY_READING;
}
srt_epoll_clear_usocks(srt_epoll);
bool have_connectors = false, have_ready = false;
for (auto& d: m_group_data)
{
if (d.status < SRTS_CONNECTED)
{
int modes = SRT_EPOLL_IN | SRT_EPOLL_OUT;
srt_epoll_add_usock(srt_epoll, d.id, &modes);
have_connectors = true;
}
else if (d.status == SRTS_CONNECTED)
{
have_ready = true;
}
}
if (have_ready || have_connectors)
{
Verb() << "(still have: " << (have_ready ? "+" : "-") << "ready, "
<< (have_connectors ? "+" : "-") << "conenctors).";
goto RETRY_READING;
}
if (have_ready)
{
Verb() << "(connected in the meantime)";
goto RETRY_READING;
}
if (have_connectors)
{
Verb() << "(waiting for pending connectors to connect)";
vector<SRTSOCKET> sready;
sready.resize(m_group_data.size());
int ready_len = m_group_data.size();
if (srt_epoll_wait(srt_epoll, sready.data(), &ready_len, 0, 0, -1, 0, 0, 0, 0) == SRT_ERROR)
{
Error("All sockets in the group disconnected");
}
goto RETRY_READING;
}
Error("No data extracted");
return output; }
#endif
MediaPacket SrtSource::Read(size_t chunk)
{
static size_t counter = 1;
bool have_group ATR_UNUSED = !m_group_nodes.empty();
bytevector data(chunk);
#ifdef SRT_OLD_APP_READER
if (have_group || m_listener_group)
{
data = GroupRead(chunk);
}
if (have_group)
{
UpdateGroupStatus(m_group_data.data(), m_group_data.size());
}
#else
SRT_MSGCTRL mctrl = srt_msgctrl_default;
bool ready = true;
int stat;
do
{
#if ENABLE_EXPERIMENTAL_BONDING
if (have_group || m_listener_group)
{
mctrl.grpdata = m_group_data.data();
mctrl.grpdata_size = m_group_data.size();
}
#endif
::transmit_throw_on_interrupt = true;
stat = srt_recvmsg2(m_sock, data.data(), chunk, &mctrl);
::transmit_throw_on_interrupt = false;
if (stat == SRT_ERROR)
{
if (!m_blocking_mode)
{
if (srt_getlasterror(NULL) == SRT_EASYNCRCV)
{
Epoll_again:
Verb() << "AGAIN: - waiting for data by epoll(" << srt_epoll << ")...";
int len = 2;
SRT_EPOLL_EVENT sready[2];
len = srt_epoll_uwait(srt_epoll, sready, len, -1);
if (len != -1)
{
Verb() << "... epoll reported ready " << len << " sockets";
bool any_read_ready = false;
for (int i = 0; i < len; ++i)
{
if (sready[i].events & SRT_EPOLL_UPDATE)
{
Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
}
if (sready[i].events & SRT_EPOLL_IN)
any_read_ready = true;
}
if (!any_read_ready)
{
Verb() << " ... [NOT READ READY - AGAIN]";
goto Epoll_again;
}
continue;
}
}
}
Error("srt_recvmsg2");
}
if (stat == 0)
{
throw ReadEOF(hostport_copy);
}
#if PLEASE_LOG
extern srt_logging::Logger applog;
LOGC(applog.Debug, log << "recv: #" << mctrl.msgno << " %" << mctrl.pktseq << " "
<< BufferStamp(data.data(), stat) << " BELATED: " << ((CTimer::getTime()-mctrl.srctime)/1000.0) << "ms");
#endif
Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << " " << BufferStamp(data.data(), stat) << ") " << VerbNoEOL;
}
while (!ready);
chunk = size_t(stat);
if (chunk < data.size())
data.resize(chunk);
const bool need_bw_report = transmit_bw_report && int(counter % transmit_bw_report) == transmit_bw_report - 1;
const bool need_stats_report = transmit_stats_report && counter % transmit_stats_report == transmit_stats_report - 1;
#if ENABLE_EXPERIMENTAL_BONDING
if (have_group) {
UpdateGroupStatus(mctrl.grpdata, mctrl.grpdata_size);
if (transmit_stats_writer && (need_stats_report || need_bw_report))
{
PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
for (size_t i = 0; i < mctrl.grpdata_size; ++i)
PrintSrtStats(mctrl.grpdata[i].id, need_stats_report, need_bw_report, need_stats_report);
}
}
else
#endif
{
if (transmit_stats_writer && (need_stats_report || need_bw_report))
{
PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
}
}
#endif
++counter;
return MediaPacket(data, mctrl.srctime);
}
SrtTarget::SrtTarget(std::string host, int port, std::string path, const std::map<std::string,std::string>& par)
{
Init(host, port, path, par, SRT_EPOLL_OUT);
}
int SrtTarget::ConfigurePre(SRTSOCKET sock)
{
int result = SrtCommon::ConfigurePre(sock);
if (result == -1)
return result;
int yes = 1;
result = srt_setsockopt(sock, 0, SRTO_SENDER, &yes, sizeof yes);
if (result == -1)
return result;
return 0;
}
void SrtTarget::Write(const MediaPacket& data)
{
static int counter = 1;
::transmit_throw_on_interrupt = true;
if (!m_blocking_mode)
{
Epoll_again:
int len = 2;
SRT_EPOLL_EVENT sready[2];
len = srt_epoll_uwait(srt_epoll, sready, len, -1);
if (len != -1)
{
bool any_write_ready = false;
for (int i = 0; i < len; ++i)
{
if (sready[i].events & SRT_EPOLL_UPDATE)
{
Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
}
if (sready[i].events & SRT_EPOLL_OUT)
any_write_ready = true;
}
if (!any_write_ready)
{
Verb() << " ... [NOT WRITE READY - AGAIN]";
goto Epoll_again;
}
}
else
{
Error("srt_epoll_uwait");
}
}
SRT_MSGCTRL mctrl = srt_msgctrl_default;
#if ENABLE_EXPERIMENTAL_BONDING
bool have_group = !m_group_nodes.empty();
if (have_group || m_listener_group)
{
mctrl.grpdata = m_group_data.data();
mctrl.grpdata_size = m_group_data.size();
}
#endif
if (transmit_use_sourcetime)
{
mctrl.srctime = data.time;
}
int stat = srt_sendmsg2(m_sock, data.payload.data(), data.payload.size(), &mctrl);
if (stat == SRT_ERROR)
Error("srt_sendmsg");
::transmit_throw_on_interrupt = false;
const bool need_bw_report = transmit_bw_report && int(counter % transmit_bw_report) == transmit_bw_report - 1;
const bool need_stats_report = transmit_stats_report && counter % transmit_stats_report == transmit_stats_report - 1;
#if ENABLE_EXPERIMENTAL_BONDING
if (have_group)
{
UpdateGroupStatus(mctrl.grpdata, mctrl.grpdata_size);
if (transmit_stats_writer && (need_stats_report || need_bw_report))
{
PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
for (size_t i = 0; i < mctrl.grpdata_size; ++i)
PrintSrtStats(mctrl.grpdata[i].id, need_stats_report, need_bw_report, need_stats_report);
}
}
else
#endif
{
if (transmit_stats_writer && (need_stats_report || need_bw_report))
{
PrintSrtStats(m_sock, need_stats_report, need_bw_report, need_stats_report);
}
}
Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << " " << BufferStamp(data.payload.data(), data.payload.size()) << ") " << VerbNoEOL;
++counter;
}
SrtRelay::SrtRelay(std::string host, int port, std::string path, const std::map<std::string,std::string>& par)
{
Init(host, port, path, par, SRT_EPOLL_IN | SRT_EPOLL_OUT);
}
SrtModel::SrtModel(string host, int port, map<string,string> par)
{
InitParameters(host, "", par);
if (m_mode == "caller")
is_caller = true;
else if (m_mode == "rendezvous")
is_rend = true;
else if (m_mode != "listener")
throw std::invalid_argument("Wrong 'mode' attribute; expected: caller, listener, rendezvous");
m_host = host;
m_port = port;
}
void SrtModel::Establish(std::string& w_name)
{
if (is_rend)
{
OpenRendezvous(m_adapter, m_host, m_port);
}
else if (is_caller)
{
PrepareClient();
if (w_name != "")
{
Verb() << "Connect with requesting stream [" << w_name << "]";
srt::setstreamid(m_sock, w_name);
}
else
{
Verb() << "NO STREAM ID for SRT connection";
}
if (m_outgoing_port)
{
Verb() << "Setting outgoing port: " << m_outgoing_port;
SetupAdapter("", m_outgoing_port);
}
ConnectClient(m_host, m_port);
if (m_outgoing_port == 0)
{
sockaddr_any s(AF_INET);
int namelen = s.size();
if (srt_getsockname(Socket(), (s.get()), (&namelen)) == SRT_ERROR)
{
Error("srt_getsockname");
}
m_outgoing_port = s.hport();
Verb() << "Extracted outgoing port: " << m_outgoing_port;
}
}
else
{
if (Listener() == SRT_INVALID_SOCK)
{
Verb() << "Setting up listener: port=" << m_port << " backlog=5";
PrepareListener(m_adapter, m_port, 5);
}
Verb() << "Accepting a client...";
AcceptNewClient();
w_name = UDT::getstreamid(m_sock);
Verb() << "... GOT CLIENT for stream [" << w_name << "]";
}
}
template <class Iface> struct Srt;
template <> struct Srt<Source> { typedef SrtSource type; };
template <> struct Srt<Target> { typedef SrtTarget type; };
template <> struct Srt<Relay> { typedef SrtRelay type; };
template <class Iface>
Iface* CreateSrt(const string& host, int port, std::string path, const map<string,string>& par)
{ return new typename Srt<Iface>::type (host, port, path, par); }
MediaPacket ConsoleRead(size_t chunk)
{
bytevector data(chunk);
bool st = cin.read(data.data(), chunk).good();
chunk = cin.gcount();
if (chunk == 0 && !st)
return bytevector();
int64_t stime = 0;
if (transmit_use_sourcetime)
stime = srt_time_now();
if (chunk < data.size())
data.resize(chunk);
if (data.empty())
throw Source::ReadEOF("CONSOLE device");
return MediaPacket(data, stime);
}
class ConsoleSource: public virtual Source
{
public:
ConsoleSource()
{
}
MediaPacket Read(size_t chunk) override
{
return ConsoleRead(chunk);
}
bool IsOpen() override { return cin.good(); }
bool End() override { return cin.eof(); }
};
class ConsoleTarget: public virtual Target
{
public:
ConsoleTarget()
{
}
void Write(const MediaPacket& data) override
{
cout.write(data.payload.data(), data.payload.size());
}
bool IsOpen() override { return cout.good(); }
bool Broken() override { return cout.eof(); }
};
class ConsoleRelay: public Relay, public ConsoleSource, public ConsoleTarget
{
public:
ConsoleRelay() = default;
bool IsOpen() override { return cin.good() && cout.good(); }
};
template <class Iface> struct Console;
template <> struct Console<Source> { typedef ConsoleSource type; };
template <> struct Console<Target> { typedef ConsoleTarget type; };
template <> struct Console<Relay> { typedef ConsoleRelay type; };
template <class Iface>
Iface* CreateConsole() { return new typename Console<Iface>::type (); }
SocketOption udp_options [] {
{ "iptos", IPPROTO_IP, IP_TOS, SocketOption::PRE, SocketOption::INT, nullptr },
{ "mcloop", IPPROTO_IP, IP_MULTICAST_LOOP, SocketOption::PRE, SocketOption::INT, nullptr }
};
static inline bool IsMulticast(in_addr adr)
{
unsigned char* abytes = (unsigned char*)&adr.s_addr;
unsigned char c = abytes[0];
return c >= 224 && c <= 239;
}
class UdpCommon
{
protected:
int m_sock = -1;
sockaddr_any sadr;
string adapter;
map<string, string> m_options;
void Setup(string host, int port, map<string,string> attr)
{
m_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (m_sock == -1)
Error(SysError(), "UdpCommon::Setup: socket");
int yes = 1;
::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof yes);
sadr = CreateAddr(host, port);
bool is_multicast = false;
if (sadr.family() == AF_INET)
{
if (attr.count("multicast"))
{
if (!IsMulticast(sadr.sin.sin_addr))
{
throw std::runtime_error("UdpCommon: requested multicast for a non-multicast-type IP address");
}
is_multicast = true;
}
else if (IsMulticast(sadr.sin.sin_addr))
{
is_multicast = true;
}
if (is_multicast)
{
ip_mreq_source mreq_ssm;
ip_mreq mreq;
sockaddr_any maddr;
int opt_name;
void* mreq_arg_ptr;
socklen_t mreq_arg_size;
adapter = attr.count("adapter") ? attr.at("adapter") : string();
if (adapter == "")
{
Verb() << "Multicast: home address: INADDR_ANY:" << port;
maddr.sin.sin_family = AF_INET;
maddr.sin.sin_addr.s_addr = htonl(INADDR_ANY);
maddr.sin.sin_port = htons(port); }
else
{
Verb() << "Multicast: home address: " << adapter << ":" << port;
maddr = CreateAddr(adapter, port);
}
if (attr.count("source"))
{
opt_name = IP_ADD_SOURCE_MEMBERSHIP;
mreq_ssm.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
mreq_ssm.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
inet_pton(AF_INET, attr.at("source").c_str(), &mreq_ssm.imr_sourceaddr);
mreq_arg_size = sizeof(mreq_ssm);
mreq_arg_ptr = &mreq_ssm;
}
else
{
opt_name = IP_ADD_MEMBERSHIP;
mreq.imr_multiaddr.s_addr = sadr.sin.sin_addr.s_addr;
mreq.imr_interface.s_addr = maddr.sin.sin_addr.s_addr;
mreq_arg_size = sizeof(mreq);
mreq_arg_ptr = &mreq;
}
#ifdef _WIN32
const char* mreq_arg = (const char*)mreq_arg_ptr;
const auto status_error = SOCKET_ERROR;
#else
const void* mreq_arg = mreq_arg_ptr;
const auto status_error = -1;
#endif
#if defined(_WIN32) || defined(__CYGWIN__)
sadr = maddr;
int reuse = 1;
int shareAddrRes = setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char*>(&reuse), sizeof(reuse));
if (shareAddrRes == status_error)
{
throw runtime_error("marking socket for shared use failed");
}
Verb() << "Multicast(Windows): will bind to home address";
#else
Verb() << "Multicast(POSIX): will bind to IGMP address: " << host;
#endif
int res = setsockopt(m_sock, IPPROTO_IP, opt_name, mreq_arg, mreq_arg_size);
if (res == status_error)
{
Error(errno, "adding to multicast membership failed");
}
attr.erase("multicast");
attr.erase("adapter");
}
}
if (attr.count("ttl"))
{
int ttl = stoi(attr.at("ttl"));
int res = setsockopt(m_sock, IPPROTO_IP, IP_TTL, (const char*)&ttl, sizeof ttl);
if (res == -1)
Verb() << "WARNING: failed to set 'ttl' (IP_TTL) to " << ttl;
res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char*)&ttl, sizeof ttl);
if (res == -1)
Verb() << "WARNING: failed to set 'ttl' (IP_MULTICAST_TTL) to " << ttl;
attr.erase("ttl");
}
m_options = attr;
for (auto o: udp_options)
{
if (m_options.count(o.name))
{
string value = m_options.at(o.name);
bool ok = o.apply<SocketOption::SYSTEM>(m_sock, value);
if (!ok)
Verb() << "WARNING: failed to set '" << o.name << "' to " << value;
}
}
}
void Error(int err, string src)
{
char buf[512];
string message = SysStrError(err, buf, 512u);
if (Verbose::on)
Verb() << "FAILURE\n" << src << ": [" << err << "] " << message;
else
cerr << "\nERROR #" << err << ": " << message << endl;
throw TransmissionError("error: " + src + ": " + message);
}
~UdpCommon()
{
#ifdef _WIN32
if (m_sock != -1)
{
shutdown(m_sock, SD_BOTH);
closesocket(m_sock);
m_sock = -1;
}
#else
close(m_sock);
#endif
}
};
class UdpSource: public virtual Source, public virtual UdpCommon
{
bool eof = true;
public:
UdpSource(string host, int port, const map<string,string>& attr)
{
Setup(host, port, attr);
int stat = ::bind(m_sock, sadr.get(), sadr.size());
if (stat == -1)
Error(SysError(), "Binding address for UDP");
eof = false;
}
MediaPacket Read(size_t chunk) override
{
bytevector data(chunk);
sockaddr_any sa(sadr.family());
int64_t srctime = 0;
int stat = recvfrom(m_sock, data.data(), (int) chunk, 0, sa.get(), &sa.syslen());
if (transmit_use_sourcetime)
{
srctime = srt_time_now();
}
if (stat == -1)
Error(SysError(), "UDP Read/recvfrom");
if (stat < 1)
{
eof = true;
return bytevector();
}
chunk = size_t(stat);
if (chunk < data.size())
data.resize(chunk);
return MediaPacket(data, srctime);
}
bool IsOpen() override { return m_sock != -1; }
bool End() override { return eof; }
};
class UdpTarget: public virtual Target, public virtual UdpCommon
{
public:
UdpTarget(string host, int port, const map<string,string>& attr )
{
Setup(host, port, attr);
if (adapter != "")
{
auto maddr = CreateAddr(adapter, 0);
in_addr addr = maddr.sin.sin_addr;
int res = setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
if (res == -1)
{
Error(SysError(), "setsockopt/IP_MULTICAST_IF: " + adapter);
}
}
}
void Write(const MediaPacket& data) override
{
int stat = sendto(m_sock, data.payload.data(), data.payload.size(), 0, (sockaddr*)&sadr, sizeof sadr);
if (stat == -1)
Error(SysError(), "UDP Write/sendto");
}
bool IsOpen() override { return m_sock != -1; }
bool Broken() override { return false; }
};
class UdpRelay: public Relay, public UdpSource, public UdpTarget
{
public:
UdpRelay(string host, int port, const map<string,string>& attr):
UdpSource(host, port, attr),
UdpTarget(host, port, attr)
{
}
bool IsOpen() override { return m_sock != -1; }
};
template <class Iface> struct Udp;
template <> struct Udp<Source> { typedef UdpSource type; };
template <> struct Udp<Target> { typedef UdpTarget type; };
template <> struct Udp<Relay> { typedef UdpRelay type; };
template <class Iface>
Iface* CreateUdp(const string& host, int port, const map<string,string>& par) { return new typename Udp<Iface>::type (host, port, par); }
template<class Base>
inline bool IsOutput() { return false; }
template<>
inline bool IsOutput<Target>() { return true; }
template <class Base>
extern unique_ptr<Base> CreateMedium(const string& uri)
{
unique_ptr<Base> ptr;
UriParser u(uri);
int iport = 0;
switch ( u.type() )
{
default:
break; case UriParser::FILE:
if (u.host() == "con" || u.host() == "console")
{
if ( IsOutput<Base>() && (
(Verbose::on && Verbose::cverb == &cout)
|| transmit_bw_report || transmit_stats_report) )
{
cerr << "ERROR: file://con with -v or -r or -s would result in mixing the data and text info.\n";
cerr << "ERROR: HINT: you can stream through a FIFO (named pipe)\n";
throw invalid_argument("incorrect parameter combination");
}
ptr.reset( CreateConsole<Base>() );
}
else
ptr.reset( CreateFile<Base>(u.path()));
break;
case UriParser::SRT:
ptr.reset( CreateSrt<Base>(u.host(), u.portno(), u.path(), u.parameters()) );
break;
case UriParser::UDP:
iport = atoi(u.port().c_str());
if (iport < 1024)
{
cerr << "Port value invalid: " << iport << " - must be >=1024\n";
throw invalid_argument("Invalid port number");
}
ptr.reset( CreateUdp<Base>(u.host(), iport, u.parameters()) );
break;
}
if (ptr)
ptr->uri = move(u);
return ptr;
}
std::unique_ptr<Source> Source::Create(const std::string& url)
{
return CreateMedium<Source>(url);
}
std::unique_ptr<Target> Target::Create(const std::string& url)
{
return CreateMedium<Target>(url);
}