#include "platform_sys.h"
#include <exception>
#include <stdexcept>
#include <typeinfo>
#include <iterator>
#include <vector>
#include <cstring>
#include "utilities.h"
#include "netinet_any.h"
#include "api.h"
#include "core.h"
#include "epoll.h"
#include "logging.h"
#include "threadname.h"
#include "srt.h"
#include "udt.h"
#ifdef _WIN32
#include <win/wintime.h>
#endif
#ifdef _MSC_VER
#pragma warning(error: 4530)
#endif
using namespace std;
using namespace srt_logging;
using namespace srt::sync;
extern LogConfig srt_logger_config;
void CUDTSocket::construct()
{
#if ENABLE_EXPERIMENTAL_BONDING
m_IncludedGroup = NULL;
m_IncludedIter = CUDTGroup::gli_NULL();
#endif
setupMutex(m_AcceptLock, "Accept");
setupCond(m_AcceptCond, "Accept");
setupMutex(m_ControlLock, "Control");
}
CUDTSocket::~CUDTSocket()
{
delete m_pUDT;
m_pUDT = NULL;
delete m_pQueuedSockets;
delete m_pAcceptSockets;
releaseMutex(m_AcceptLock);
releaseCond(m_AcceptCond);
releaseMutex(m_ControlLock);
}
SRT_SOCKSTATUS CUDTSocket::getStatus()
{
if (m_pUDT->m_bBroken)
return SRTS_BROKEN;
if ((m_Status == SRTS_CONNECTING) && !m_pUDT->m_bConnecting && !m_pUDT->m_bConnected)
return SRTS_BROKEN;
return m_Status;
}
void CUDTSocket::makeShutdown()
{
#if ENABLE_EXPERIMENTAL_BONDING
if (m_IncludedGroup)
{
HLOGC(smlog.Debug, log << "@" << m_SocketID << " IS MEMBER OF $" << m_IncludedGroup->id() << " - REMOVING FROM GROUP");
removeFromGroup(true);
}
#endif
HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
m_pUDT->closeInternal();
}
void CUDTSocket::makeClosed()
{
m_pUDT->m_bBroken = true;
makeShutdown();
m_Status = SRTS_CLOSED;
m_tsClosureTimeStamp = steady_clock::now();
}
bool CUDTSocket::readReady()
{
if (m_pUDT->m_bConnected && m_pUDT->m_pRcvBuffer->isRcvDataReady())
return true;
if (m_pUDT->m_bListening)
{
return m_pQueuedSockets->size() > 0;
}
return broken();
}
bool CUDTSocket::writeReady()
{
return (m_pUDT->m_bConnected
&& (m_pUDT->m_pSndBuffer->getCurrBufSize() < m_pUDT->m_iSndBufSize))
|| broken();
}
bool CUDTSocket::broken()
{
return m_pUDT->m_bBroken || !m_pUDT->m_bConnected;
}
CUDTUnited::CUDTUnited():
m_Sockets(),
m_GlobControlLock(),
m_IDLock(),
m_mMultiplexer(),
m_MultiplexerLock(),
m_pCache(NULL),
m_bClosing(false),
m_GCStopCond(),
m_InitLock(),
m_iInstanceCount(0),
m_bGCStatus(false),
m_ClosedSockets()
{
timeval t;
gettimeofday(&t, 0);
srand((unsigned int)t.tv_usec);
const double rand1_0 = double(rand())/RAND_MAX;
m_SocketIDGenerator = 1 + int(MAX_SOCKET_VAL * rand1_0);
m_SocketIDGenerator_init = m_SocketIDGenerator;
setupMutex(m_GlobControlLock, "GlobControl");
setupMutex(m_IDLock, "ID");
setupMutex(m_InitLock, "Init");
m_pCache = new CCache<CInfoBlock>;
}
CUDTUnited::~CUDTUnited()
{
if (m_bGCStatus)
{
cleanup();
}
releaseMutex(m_GlobControlLock);
releaseMutex(m_IDLock);
releaseMutex(m_InitLock);
delete m_pCache;
}
std::string CUDTUnited::CONID(SRTSOCKET sock)
{
if ( sock == 0 )
return "";
std::ostringstream os;
os << "@" << sock << ":";
return os.str();
}
int CUDTUnited::startup()
{
ScopedLock gcinit(m_InitLock);
if (m_iInstanceCount++ > 0)
return 1;
#ifdef _WIN32
WORD wVersionRequested;
WSADATA wsaData;
wVersionRequested = MAKEWORD(2, 2);
if (0 != WSAStartup(wVersionRequested, &wsaData))
throw CUDTException(MJ_SETUP, MN_NONE, WSAGetLastError());
#endif
PacketFilter::globalInit();
if (m_bGCStatus)
return 1;
m_bClosing = false;
try
{
setupMutex(m_GCStopLock, "GCStop");
setupCond(m_GCStopCond, "GCStop");
}
catch (...)
{
return -1;
}
if (!StartThread(m_GCThread, garbageCollect, this, "SRT:GC"))
return -1;
m_bGCStatus = true;
return 0;
}
int CUDTUnited::cleanup()
{
ScopedLock gcinit(m_InitLock);
if (--m_iInstanceCount > 0)
return 0;
if (!m_bGCStatus)
return 0;
m_bClosing = true;
HLOGC(inlog.Debug, log << "GarbageCollector: thread EXIT");
CSync::signal_relaxed(m_GCStopCond);
m_GCThread.join();
#ifndef _WIN32
releaseCond(m_GCStopCond);
#endif
m_bGCStatus = false;
#ifdef _WIN32
WSACleanup();
#endif
return 0;
}
SRTSOCKET CUDTUnited::generateSocketID(bool for_group)
{
ScopedLock guard(m_IDLock);
int sockval = m_SocketIDGenerator - 1;
if (sockval <= 0)
{
m_SocketIDGenerator = MAX_SOCKET_VAL-1;
}
if ( sockval == m_SocketIDGenerator_init )
{
m_SocketIDGenerator_init = 0;
}
if ( m_SocketIDGenerator_init == 0 )
{
int startval = sockval;
for (;;) {
enterCS(m_GlobControlLock);
const bool exists =
#if ENABLE_EXPERIMENTAL_BONDING
for_group
? m_Groups.count(sockval | SRTGROUP_MASK)
:
#endif
m_Sockets.count(sockval);
leaveCS(m_GlobControlLock);
if (exists)
{
--sockval;
if (sockval <= 0)
sockval = MAX_SOCKET_VAL-1;
if (sockval == startval)
{
m_SocketIDGenerator = sockval+1; throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
continue;
}
m_SocketIDGenerator = sockval;
break;
}
}
else
{
m_SocketIDGenerator = sockval;
}
if (for_group)
sockval = m_SocketIDGenerator | SRTGROUP_MASK;
else
sockval = m_SocketIDGenerator;
LOGC(smlog.Debug, log << "generateSocketID: " << (for_group ? "(group)" : "") << ": @" << sockval);
return sockval;
}
SRTSOCKET CUDTUnited::newSocket(CUDTSocket** pps)
{
CUDTSocket* ns = NULL;
try
{
ns = new CUDTSocket;
ns->m_pUDT = new CUDT(ns);
}
catch (...)
{
delete ns;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
try
{
ns->m_SocketID = generateSocketID();
}
catch (...)
{
delete ns;
throw;
}
ns->m_Status = SRTS_INIT;
ns->m_ListenSocket = 0;
ns->m_pUDT->m_SocketID = ns->m_SocketID;
ns->m_pUDT->m_pCache = m_pCache;
try
{
HLOGC(smlog.Debug, log << CONID(ns->m_SocketID)
<< "newSocket: mapping socket "
<< ns->m_SocketID);
ScopedLock cs(m_GlobControlLock);
m_Sockets[ns->m_SocketID] = ns;
}
catch (...)
{
delete ns;
ns = NULL;
}
if (!ns)
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
if (pps)
*pps = ns;
return ns->m_SocketID;
}
int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, const CPacket& hspkt,
CHandShake& w_hs, int& w_error)
{
CUDTSocket* ns = NULL;
w_error = SRT_REJ_IPE;
CUDTSocket* ls = locateSocket(listen);
if (!ls)
{
LOGC(cnlog.Error, log << "IPE: newConnection by listener socket id=" << listen << " which DOES NOT EXIST.");
return -1;
}
HLOGC(cnlog.Debug, log << "newConnection: creating new socket after listener @"
<< listen << " contacted with backlog=" << ls->m_uiBackLog);
if ((ns = locatePeer(peer, w_hs.m_iID, w_hs.m_iISN)) != NULL)
{
if (ns->m_pUDT->m_bBroken)
{
ns->m_Status = SRTS_CLOSED;
ns->m_tsClosureTimeStamp = steady_clock::now();
ScopedLock acceptcg(ls->m_AcceptLock);
ls->m_pQueuedSockets->erase(ns->m_SocketID);
ls->m_pAcceptSockets->erase(ns->m_SocketID);
}
else
{
HLOGC(cnlog.Debug, log
<< "newConnection: located a WORKING peer @"
<< w_hs.m_iID << " - ADAPTING.");
w_hs.m_iISN = ns->m_pUDT->m_iISN;
w_hs.m_iMSS = ns->m_pUDT->m_iMSS;
w_hs.m_iFlightFlagSize = ns->m_pUDT->m_iFlightFlagSize;
w_hs.m_iReqType = URQ_CONCLUSION;
w_hs.m_iID = ns->m_SocketID;
return 0;
}
}
else
{
HLOGC(cnlog.Debug, log << "newConnection: NOT located any peer @"
<< w_hs.m_iID << " - resuming with initial connection.");
}
if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)
{
w_error = SRT_REJ_BACKLOG;
LOGC(cnlog.Note, log << "newConnection: listen backlog=" << ls->m_uiBackLog << " EXCEEDED");
return -1;
}
try
{
ns = new CUDTSocket;
ns->m_pUDT = new CUDT(ns, *(ls->m_pUDT));
ns->m_PeerAddr = peer;
}
catch (...)
{
w_error = SRT_REJ_RESOURCE;
delete ns;
LOGC(cnlog.Error, log << "IPE: newConnection: unexpected exception (probably std::bad_alloc)");
return -1;
}
ns->m_pUDT->m_RejectReason = SRT_REJ_UNKNOWN;
try
{
ns->m_SocketID = generateSocketID();
}
catch (const CUDTException& e)
{
LOGF(cnlog.Fatal, "newConnection: IPE: all sockets occupied? Last gen=%d", m_SocketIDGenerator);
delete ns;
return -1;
}
ns->m_ListenSocket = listen;
ns->m_pUDT->m_SocketID = ns->m_SocketID;
ns->m_PeerID = w_hs.m_iID;
ns->m_iISN = w_hs.m_iISN;
HLOGC(cnlog.Debug, log << "newConnection: DATA: lsnid=" << listen
<< " id=" << ns->m_pUDT->m_SocketID
<< " peerid=" << ns->m_pUDT->m_PeerID
<< " ISN=" << ns->m_iISN);
int error = 0;
bool should_submit_to_accept = true;
w_error = SRT_REJ_RESOURCE;
try
{
HLOGF(cnlog.Debug,
"newConnection: incoming %s, mapping socket %d",
peer.str().c_str(), ns->m_SocketID);
{
ScopedLock cg(m_GlobControlLock);
m_Sockets[ns->m_SocketID] = ns;
}
ns->m_pUDT->open();
updateListenerMux(ns, ls);
if (ls->m_pUDT->m_cbAcceptHook)
{
if (!ls->m_pUDT->runAcceptHook(ns->m_pUDT, peer.get(), w_hs, hspkt))
{
w_error = ns->m_pUDT->m_RejectReason;
error = 1;
goto ERR_ROLLBACK;
}
}
ns->m_pUDT->acceptAndRespond(ls->m_SelfAddr, peer, hspkt, (w_hs));
}
catch (...)
{
w_error = ns->m_pUDT->m_RejectReason;
error = 1;
goto ERR_ROLLBACK;
}
ns->m_Status = SRTS_CONNECTED;
ns->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr((ns->m_SelfAddr));
CIPAddress::pton((ns->m_SelfAddr), ns->m_pUDT->m_piSelfIP, ns->m_SelfAddr.family(), peer);
enterCS(m_GlobControlLock);
try
{
HLOGF(cnlog.Debug,
"newConnection: mapping peer %d to that socket (%d)\n",
ns->m_PeerID, ns->m_SocketID);
m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID);
}
catch (...)
{
LOGC(cnlog.Error, log << "newConnection: error when mapping peer!");
error = 2;
}
leaveCS(m_GlobControlLock);
#if ENABLE_EXPERIMENTAL_BONDING
if (ns->m_IncludedGroup)
{
CUDTGroup* g = ns->m_IncludedGroup;
ScopedLock glock (g->m_GroupLock);
CUDTGroup::gli_t gi;
for (gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
{
if (gi->laststatus == SRTS_CONNECTED)
{
HLOGC(cnlog.Debug, log << "Found another connected socket in the group: $"
<< gi->id << " - socket will be NOT given up for accepting");
should_submit_to_accept = false;
break;
}
}
gi = ns->m_IncludedIter;
HLOGC(cnlog.Debug, log << "newConnection(GROUP): Socket @" << ns->m_SocketID << " BELONGS TO $" << g->id()
<< " - will " << (should_submit_to_accept? "" : "NOT ") << "report in accept");
gi->sndstate = SRT_GST_IDLE;
gi->rcvstate = SRT_GST_IDLE;
gi->laststatus = SRTS_CONNECTED;
if (!g->m_bConnected)
{
HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED");
g->m_bConnected = true;
}
if (!g->m_listener)
{
g->m_listener = ls;
int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
srt_epoll_add_usock(g->m_RcvEID, ls->m_SocketID, &listener_modes);
}
int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
srt_epoll_add_usock(g->m_RcvEID, ns->m_SocketID, &read_modes);
srt_epoll_add_usock(g->m_SndEID, ns->m_SocketID, &write_modes);
}
else
{
HLOGC(cnlog.Debug, log << "newConnection: Socket @" << ns->m_SocketID << " is not in a group");
}
#endif
if (should_submit_to_accept)
{
enterCS(ls->m_AcceptLock);
try
{
ls->m_pQueuedSockets->insert(ns->m_SocketID);
}
catch (...)
{
LOGC(cnlog.Error, log << "newConnection: error when queuing socket!");
error = 3;
}
leaveCS(ls->m_AcceptLock);
HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID << " submitted for acceptance");
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, SRT_EPOLL_ACCEPT, true);
CGlobEvent::triggerEvent();
if (error > 0)
{
goto ERR_ROLLBACK;
}
CSync::lock_signal(ls->m_AcceptCond, ls->m_AcceptLock);
}
else
{
HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID
<< " NOT submitted to acceptance, another socket in the group is already connected");
{
ScopedLock cg (ls->m_AcceptLock);
ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), ns->m_SocketID);
}
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, SRT_EPOLL_UPDATE, true);
CGlobEvent::triggerEvent();
}
ERR_ROLLBACK:
if (error > 0)
{
#if ENABLE_LOGGING
static const char* why [] = {
"UNKNOWN ERROR",
"INTERNAL REJECTION",
"IPE when mapping a socket",
"IPE when inserting a socket"
};
LOGC(cnlog.Warn, log << CONID(ns->m_SocketID) << "newConnection: connection rejected due to: "
<< why[error] << " - " << RequestTypeStr(URQFailure(w_error)));
#endif
SRTSOCKET id = ns->m_SocketID;
ns->makeClosed();
{
ScopedLock cg(m_GlobControlLock);
m_Sockets.erase(id);
m_ClosedSockets[id] = ns;
}
return -1;
}
return 1;
}
int CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
{
return s_UDTUnited.installAcceptHook(lsn, hook, opaq);
}
int CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
{
try
{
CUDTSocket* s = locateSocket(lsn, ERH_THROW);
s->m_pUDT->installAcceptHook(hook, opaq);
}
catch (CUDTException& e)
{
SetThreadLocalError(e);
return SRT_ERROR;
}
return 0;
}
int CUDT::installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq)
{
return s_UDTUnited.installConnectHook(lsn, hook, opaq);
}
int CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_fn* hook, void* opaq)
{
try
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = locateGroup(u, ERH_THROW);
g->installConnectHook(hook, opaq);
return 0;
}
#endif
CUDTSocket* s = locateSocket(u, ERH_THROW);
s->m_pUDT->installConnectHook(hook, opaq);
}
catch (CUDTException& e)
{
SetThreadLocalError(e);
return SRT_ERROR;
}
return 0;
}
SRT_SOCKSTATUS CUDTUnited::getStatus(const SRTSOCKET u)
{
ScopedLock cg(m_GlobControlLock);
sockets_t::const_iterator i = m_Sockets.find(u);
if (i == m_Sockets.end())
{
if (m_ClosedSockets.find(u) != m_ClosedSockets.end())
return SRTS_CLOSED;
return SRTS_NONEXIST;
}
return i->second->getStatus();
}
int CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
{
ScopedLock cg(s->m_ControlLock);
if (s->m_Status != SRTS_INIT)
throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
s->m_pUDT->open();
updateMux(s, name);
s->m_Status = SRTS_OPENED;
s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr((s->m_SelfAddr));
return 0;
}
int CUDTUnited::bind(CUDTSocket* s, UDPSOCKET udpsock)
{
ScopedLock cg(s->m_ControlLock);
if (s->m_Status != SRTS_INIT)
throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
sockaddr_any name;
socklen_t namelen = sizeof name;
if (::getsockname(udpsock, &name.sa, &namelen) == -1)
throw CUDTException(MJ_NOTSUP, MN_INVAL);
name.len = namelen;
s->m_pUDT->open();
updateMux(s, name, &udpsock);
s->m_Status = SRTS_OPENED;
s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr((s->m_SelfAddr));
return 0;
}
int CUDTUnited::listen(const SRTSOCKET u, int backlog)
{
if (backlog <= 0)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
if (u == UDT::INVALID_SOCK)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
ScopedLock cg(s->m_ControlLock);
if (s->m_Status == SRTS_LISTENING)
return 0;
if (s->m_Status != SRTS_OPENED)
throw CUDTException(MJ_NOTSUP, MN_ISUNBOUND, 0);
if (s->m_pUDT->m_bRendezvous)
throw CUDTException(MJ_NOTSUP, MN_ISRENDEZVOUS, 0);
s->m_uiBackLog = backlog;
try
{
s->m_pQueuedSockets = new set<SRTSOCKET>;
s->m_pAcceptSockets = new set<SRTSOCKET>;
}
catch (...)
{
delete s->m_pQueuedSockets;
delete s->m_pAcceptSockets;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
s->m_pUDT->setListenState(); s->m_Status = SRTS_LISTENING;
return 0;
}
SRTSOCKET CUDTUnited::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut)
{
CEPollDesc* ed = 0;
int eid = m_EPoll.create(&ed);
struct AtReturn
{
int eid;
CUDTUnited* that;
AtReturn(CUDTUnited* t, int e): eid(e), that(t) {}
~AtReturn()
{
that->m_EPoll.release(eid);
}
} l_ar(this, eid);
int events = SRT_EPOLL_ACCEPT;
for (int i = 0; i < lsize; ++i)
{
srt_epoll_add_usock(eid, listeners[i], &events);
}
CEPoll::fmap_t st;
m_EPoll.swait(*ed, (st), msTimeOut, true);
if (st.empty())
{
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
}
int lsn = st.begin()->first;
sockaddr_storage dummy;
int outlen = sizeof dummy;
return accept(lsn, ((sockaddr*)&dummy), (&outlen));
}
SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_addrlen)
{
if (pw_addr && !pw_addrlen)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
CUDTSocket* ls = locateSocket(listen);
if (ls == NULL)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
if (ls->m_Status != SRTS_LISTENING)
throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
if (ls->m_pUDT->m_bRendezvous)
{
LOGC(cnlog.Fatal, log << "CUDTUnited::accept: RENDEZVOUS flag passed through check in srt_listen when it set listen state");
throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
}
SRTSOCKET u = CUDT::INVALID_SOCK;
bool accepted = false;
while (!accepted)
{
UniqueLock accept_lock(ls->m_AcceptLock);
CSync accept_sync(ls->m_AcceptCond, accept_lock);
if ((ls->m_Status != SRTS_LISTENING) || ls->m_pUDT->m_bBroken)
{
accepted = true;
}
else if (ls->m_pQueuedSockets->size() > 0)
{
u = *(ls->m_pQueuedSockets->begin());
ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
accepted = true;
}
else if (!ls->m_pUDT->m_bSynRecving)
{
accepted = true;
}
if (!accepted && (ls->m_Status == SRTS_LISTENING))
accept_sync.wait();
if (ls->m_pQueuedSockets->empty())
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, SRT_EPOLL_ACCEPT, false);
}
if (u == CUDT::INVALID_SOCK)
{
if (!ls->m_pUDT->m_bSynRecving)
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
}
CUDTSocket* s = locateSocket(u);
if (s == NULL)
throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
#if ENABLE_EXPERIMENTAL_BONDING
if (ls->m_pUDT->m_OPT_GroupConnect == 1 && s->m_IncludedGroup)
{
u = s->m_IncludedGroup->m_GroupID;
s->core().m_OPT_GroupConnect = 1; }
else
#endif
{
s->core().m_OPT_GroupConnect = 0;
}
ScopedLock cg(s->m_ControlLock);
if (pw_addr != NULL && pw_addrlen != NULL)
{
const int len = s->m_PeerAddr.size();
if (*pw_addrlen < len)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
memcpy((pw_addr), &s->m_PeerAddr, len);
*pw_addrlen = len;
}
return u;
}
int CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen)
{
if (!srcname || !tarname || size_t(namelen) < sizeof (sockaddr_in))
{
LOGC(aclog.Error, log << "connect(with source): invalid call: srcname="
<< srcname << " tarname=" << tarname << " namelen=" << namelen);
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}
sockaddr_any source_addr(srcname, namelen);
if (source_addr.len == 0)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
sockaddr_any target_addr(tarname, namelen);
if (target_addr.len == 0)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = locateGroup(u, ERH_THROW);
SRT_SOCKGROUPCONFIG gd[1] = { srt_prepare_endpoint(srcname, tarname, namelen) };
return singleMemberConnect(g, gd);
}
#endif
CUDTSocket* s = locateSocket(u);
if (s == NULL)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
bind(s, source_addr);
return connectIn(s, target_addr, SRT_SEQNO_NONE);
}
int CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
{
sockaddr_any target_addr(name, namelen);
if (target_addr.len == 0)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = locateGroup(u, ERH_THROW);
SRT_SOCKGROUPCONFIG gd[1] = { srt_prepare_endpoint(NULL, name, namelen) };
return singleMemberConnect(g, gd);
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return connectIn(s, target_addr, forced_isn);
}
#if ENABLE_EXPERIMENTAL_BONDING
int CUDTUnited::singleMemberConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* gd)
{
int gstat = groupConnect(pg, gd, 1);
if (gstat == -1)
{
if (gd->errorcode == SRT_SUCCESS)
gd->errorcode = SRT_EINVPARAM;
CodeMajor mj = CodeMajor(gd->errorcode / 1000);
CodeMinor mn = CodeMinor(gd->errorcode % 1000);
return CUDT::APIError(mj, mn);
}
return gstat;
}
int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int arraysize)
{
CUDTGroup& g = *pg;
if (!g.managed())
throw CUDTException(MJ_NOTSUP, MN_INVAL);
for (int tii = 0; tii < arraysize; ++tii)
{
if (targets[tii].srcaddr.ss_family != targets[tii].peeraddr.ss_family)
{
LOGC(aclog.Error, log << "srt_connect/group: family differs on source and target address");
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}
if (targets[tii].weight > CUDT::MAX_WEIGHT)
{
LOGC(aclog.Error, log << "srt_connect/group: weight value must be between 0 and " << (+CUDT::MAX_WEIGHT));
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}
}
bool block_new_opened = !g.m_bOpened && g.m_bSynRecving;
const bool was_empty = g.empty();
SRTSOCKET retval = -1;
int eid = -1;
int connect_modes = SRT_EPOLL_CONNECT | SRT_EPOLL_ERR;
if (block_new_opened)
{
eid = srt_epoll_create();
}
map<SRTSOCKET, CUDTSocket*> spawned;
HLOGC(aclog.Debug, log << "groupConnect: will connect " << arraysize << " links and "
<< (block_new_opened ? "BLOCK until any is ready" : "leave the process in background"));
for (int tii = 0; tii < arraysize; ++tii)
{
sockaddr_any target_addr(targets[tii].peeraddr);
sockaddr_any source_addr(targets[tii].srcaddr);
SRTSOCKET& sid_rloc = targets[tii].id;
int& erc_rloc = targets[tii].errorcode;
erc_rloc = SRT_SUCCESS; HLOGC(aclog.Debug, log << "groupConnect: taking on " << sockaddr_any(targets[tii].peeraddr).str());
CUDTSocket* ns = 0;
SRTSOCKET sid = newSocket(&ns);
if (pg->m_cbConnectHook)
{
ns->m_pUDT->m_cbConnectHook = pg->m_cbConnectHook;
}
SRT_SocketOptionObject* config = targets[tii].config;
string error_reason ATR_UNUSED;
try
{
for (size_t i = 0; i < g.m_config.size(); ++i)
{
HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], g.m_config[i].value.size());
}
if (config)
{
error_reason = "user option";
ns->core().applyMemberConfigObject(*config);
}
error_reason = "bound address";
if (!source_addr.empty())
bind(ns, source_addr);
}
catch (CUDTException& e)
{
targets[tii].errorcode = e.getErrorCode();
LOGC(aclog.Error, log << "srt_connect_group: failed to set " << error_reason);
}
catch (...)
{
LOGC(aclog.Error, log << "IPE: CUDT::setOpt reported unknown exception");
targets[tii].errorcode = SRT_EINVPARAM;
}
if (targets[tii].errorcode != SRT_SUCCESS)
{
ScopedLock cs(m_GlobControlLock);
SRTSOCKET id = ns->m_SocketID;
targets[tii].id = CUDT::INVALID_SOCK;
delete ns;
m_Sockets.erase(id);
continue;
}
CUDTGroup::SocketData data = g.prepareData(ns);
if (targets[tii].token != -1)
{
data.token = targets[tii].token;
}
else
{
data.token = CUDTGroup::genToken();
targets[tii].token = data.token;
}
CUDTGroup::gli_t f = g.add(data);
ns->m_IncludedIter = f;
ns->m_IncludedGroup = &g;
f->weight = targets[tii].weight;
int isn = g.currentSchedSequence();
if (g.synconmsgno())
{
HLOGC(aclog.Debug, log << "groupConnect: NOT synchronizing sequence numbers: will sync on msgno");
isn = -1;
}
ns->m_pUDT->m_OPT_GroupConnect = 1;
ns->m_pUDT->m_bSynRecving = false;
ns->m_pUDT->m_bSynSending = false;
HLOGC(aclog.Debug, log << "groupConnect: NOTIFIED AS PENDING @" << sid << " both read and write");
srt_epoll_add_usock(g.m_SndEID, sid, &connect_modes);
srt_epoll_add_usock(g.m_RcvEID, sid, &connect_modes);
if (block_new_opened)
{
HLOGC(aclog.Debug, log << "groupConnect: WILL BLOCK on @" << sid << " until connected");
srt_epoll_add_usock(eid, sid, &connect_modes);
}
try
{
HLOGC(aclog.Debug, log << "groupConnect: connecting a new socket with ISN=" << isn);
connectIn(ns, target_addr, isn);
}
catch (CUDTException& e)
{
LOGC(aclog.Error, log << "groupConnect: socket @" << sid << " in group " << pg->id() << " failed to connect");
ns->removeFromGroup(false);
erc_rloc = e.getErrorCode();
targets[tii].errorcode = e.getErrorCode();
targets[tii].id = CUDT::INVALID_SOCK;
ScopedLock cl (m_GlobControlLock);
m_Sockets.erase(ns->m_SocketID);
delete ns;
continue;
}
catch (...)
{
LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
ns->removeFromGroup(false);
targets[tii].errorcode = SRT_ESYSOBJ;
targets[tii].id = CUDT::INVALID_SOCK;
ScopedLock cl (m_GlobControlLock);
m_Sockets.erase(ns->m_SocketID);
delete ns;
throw CUDTException(MJ_SYSTEMRES, MN_OBJECT);
}
SRT_SOCKSTATUS st;
{
ScopedLock grd (ns->m_ControlLock);
st = ns->getStatus();
}
{
ScopedLock grd (g.m_GroupLock);
if (was_empty)
{
g.syncWithSocket(ns->core(), HSD_INITIATOR);
}
HLOGC(aclog.Debug, log << "groupConnect: @" << sid << " connection successful, setting group OPEN (was "
<< (g.m_bOpened ? "ALREADY" : "NOT") << "), will " << (block_new_opened ? "" : "NOT ")
<< "block the connect call, status:" << SockStatusStr(st));
g.m_bOpened = true;
f->laststatus = st;
f->agent = source_addr;
f->peer = target_addr;
if (st >= SRTS_BROKEN)
{
f->sndstate = SRT_GST_BROKEN;
f->rcvstate = SRT_GST_BROKEN;
srt_epoll_remove_usock(g.m_SndEID, sid);
srt_epoll_remove_usock(g.m_RcvEID, sid);
}
else
{
f->sndstate = SRT_GST_PENDING;
f->rcvstate = SRT_GST_PENDING;
spawned[sid] = ns;
sid_rloc = sid;
erc_rloc = 0;
retval = sid;
}
}
}
if (retval == -1)
{
HLOGC(aclog.Debug, log << "groupConnect: none succeeded as background-spawn, exit with error");
block_new_opened = false; }
vector<SRTSOCKET> broken;
while (block_new_opened)
{
if (spawned.empty())
{
retval = -1;
break;
}
HLOGC(aclog.Debug, log << "groupConnect: first connection, applying EPOLL WAITING.");
int len = spawned.size();
vector<SRTSOCKET> ready(spawned.size());
const int estat = srt_epoll_wait(eid,
NULL, NULL, &ready[0], &len, -1, NULL, NULL,
NULL, NULL
);
if (estat == -1)
{
#if ENABLE_LOGGING
CUDTException& x = CUDT::getlasterror();
if (x.getErrorCode() != SRT_EPOLLEMPTY)
{
LOGC(aclog.Error, log << "groupConnect: srt_epoll_wait failed not because empty, unexpected IPE:"
<< x.getErrorMessage());
}
#endif
HLOGC(aclog.Debug, log << "groupConnect: srt_epoll_wait failed - breaking the wait loop");
retval = -1;
break;
}
ScopedLock lock (*g.exp_groupLock());
for (map<SRTSOCKET, CUDTSocket*>::iterator y = spawned.begin();
y != spawned.end(); ++y)
{
SRTSOCKET sid = y->first;
if (CUDT::getsockstate(sid) >= SRTS_BROKEN)
{
HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got BROKEN in the meantine during the check, remove from candidates");
broken.push_back(sid);
srt_epoll_remove_usock(eid, sid);
srt_epoll_remove_usock(g.m_SndEID, sid);
srt_epoll_remove_usock(g.m_RcvEID, sid);
}
}
for (size_t i = 0; i < broken.size(); ++i)
spawned.erase(broken[i]);
for (int i = 0; i < len; ++i)
{
map<SRTSOCKET, CUDTSocket*>::iterator x = spawned.find(ready[i]);
if (x == spawned.end())
{
continue;
}
SRTSOCKET sid = x->first;
CUDTSocket* s = x->second;
SRT_SOCKSTATUS st = s->getStatus();
if (st >= SRTS_BROKEN)
{
HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got BROKEN during background connect, remove & TRY AGAIN");
if (spawned.erase(sid))
broken.push_back(sid);
srt_epoll_remove_usock(eid, sid);
srt_epoll_remove_usock(g.m_SndEID, sid);
srt_epoll_remove_usock(g.m_RcvEID, sid);
continue;
}
if (st == SRTS_CONNECTED)
{
HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got CONNECTED as first in the group - reporting");
retval = sid;
g.m_bConnected = true;
block_new_opened = false;
srt_epoll_remove_usock(g.m_SndEID, sid);
break;
}
HLOGC(aclog.Debug, log << "groupConnect: Socket @" << sid << " got spurious wakeup in "
<< SockStatusStr(st) << " TRY AGAIN");
}
}
if (eid != -1)
{
HLOGC(aclog.Debug, log << "connect FIRST IN THE GROUP finished, removing E" << eid);
srt_epoll_release(eid);
}
for (vector<SRTSOCKET>::iterator b = broken.begin(); b != broken.end(); ++b)
{
CUDTSocket* s = locateSocket(*b, ERH_RETURN);
if (!s)
continue;
close(s);
}
if (retval == -1)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
return retval;
}
#endif
int CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
{
ScopedLock cg(s->m_ControlLock);
if (s->m_Status == SRTS_INIT)
{
if (s->m_pUDT->m_bRendezvous)
throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);
s->m_pUDT->open();
sockaddr_any autoselect_sa (target_addr.family());
updateMux(s, autoselect_sa); s->m_Status = SRTS_OPENED;
}
else
{
if (s->m_Status != SRTS_OPENED)
throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
if (target_addr.family() != s->m_SelfAddr.family())
{
LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
}
s->m_Status = SRTS_CONNECTING;
try
{
InvertedLock l_unlocker (s->m_pUDT->m_bSynRecving ? &s->m_ControlLock : 0);
s->m_PeerAddr = target_addr;
s->m_pUDT->startConnect(target_addr, forced_isn);
}
catch (CUDTException& e) {
s->m_Status = SRTS_OPENED;
throw e;
}
return 0;
}
int CUDTUnited::close(const SRTSOCKET u)
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = locateGroup(u);
if (!g)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
g->close();
deleteGroup(g);
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return close(s);
}
int CUDTUnited::close(CUDTSocket* s)
{
HLOGC(smlog.Debug, log << s->m_pUDT->CONID() << " CLOSE. Acquiring control lock");
ScopedLock socket_cg(s->m_ControlLock);
HLOGC(smlog.Debug, log << s->m_pUDT->CONID() << " CLOSING (removing from listening, closing CUDT)");
bool synch_close_snd = s->m_pUDT->m_bSynSending;
SRTSOCKET u = s->m_SocketID;
if (s->m_Status == SRTS_LISTENING)
{
if (s->m_pUDT->m_bBroken)
return 0;
s->m_tsClosureTimeStamp = steady_clock::now();
s->m_pUDT->m_bBroken = true;
HLOGC(smlog.Debug, log << s->m_pUDT->CONID() << " CLOSING (removing listener immediately)");
s->m_pUDT->notListening();
CSync::lock_broadcast(s->m_AcceptCond, s->m_AcceptLock);
}
else
{
s->makeShutdown();
HLOGC(smlog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock");
ScopedLock manager_cg(m_GlobControlLock);
sockets_t::iterator i = m_Sockets.find(u);
if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
{
HLOGC(smlog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
return 0;
}
s = i->second;
s->m_Status = SRTS_CLOSED;
s->m_tsClosureTimeStamp = steady_clock::now();
m_Sockets.erase(s->m_SocketID);
m_ClosedSockets[s->m_SocketID] = s;
HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
CGlobEvent::triggerEvent();
}
HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE");
if ( synch_close_snd )
{
#if SRT_ENABLE_CLOSE_SYNCH
HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
for (;;)
{
CSndBuffer* sb = s->m_pUDT->m_pSndBuffer;
if (!sb)
{
HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
break;
}
if (sb->getCurrBufSize() == 0)
{
HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
break;
}
bool isgone = false;
{
ScopedLock manager_cg(m_GlobControlLock);
isgone = m_ClosedSockets.count(u) == 0;
}
if (!isgone)
{
isgone = !s->m_pUDT->m_bOpened;
}
if (isgone)
{
HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
break;
}
HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
CGlobEvent::waitForEvent();
}
#endif
}
return 0;
}
void CUDTUnited::getpeername(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
{
if (!pw_name || !pw_namelen)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
if (getStatus(u) != SRTS_CONNECTED)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
if (!s->m_pUDT->m_bConnected || s->m_pUDT->m_bBroken)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
const int len = s->m_PeerAddr.size();
if (*pw_namelen < len)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
memcpy((pw_name), &s->m_PeerAddr.sa, len);
*pw_namelen = len;
}
void CUDTUnited::getsockname(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
{
if (!pw_name || !pw_namelen)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
if (s->m_pUDT->m_bBroken)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
if (s->m_Status == SRTS_INIT)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
const int len = s->m_SelfAddr.size();
if (*pw_namelen < len)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
memcpy((pw_name), &s->m_SelfAddr.sa, len);
*pw_namelen = len;
}
int CUDTUnited::select(
UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
{
const steady_clock::time_point entertime = steady_clock::now();
const long timeo_us = timeout
? timeout->tv_sec * 1000000 + timeout->tv_usec
: -1;
const steady_clock::duration timeo(microseconds_from(timeo_us));
int count = 0;
set<SRTSOCKET> rs, ws, es;
vector<CUDTSocket*> ru, wu, eu;
CUDTSocket* s;
if (readfds)
for (set<SRTSOCKET>::iterator i1 = readfds->begin();
i1 != readfds->end(); ++ i1)
{
if (getStatus(*i1) == SRTS_BROKEN)
{
rs.insert(*i1);
++ count;
}
else if (!(s = locateSocket(*i1)))
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
else
ru.push_back(s);
}
if (writefds)
for (set<SRTSOCKET>::iterator i2 = writefds->begin();
i2 != writefds->end(); ++ i2)
{
if (getStatus(*i2) == SRTS_BROKEN)
{
ws.insert(*i2);
++ count;
}
else if (!(s = locateSocket(*i2)))
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
else
wu.push_back(s);
}
if (exceptfds)
for (set<SRTSOCKET>::iterator i3 = exceptfds->begin();
i3 != exceptfds->end(); ++ i3)
{
if (getStatus(*i3) == SRTS_BROKEN)
{
es.insert(*i3);
++ count;
}
else if (!(s = locateSocket(*i3)))
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
else
eu.push_back(s);
}
do
{
for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++ j1)
{
s = *j1;
if (s->readReady() || s->m_Status == SRTS_CLOSED)
{
rs.insert(s->m_SocketID);
++ count;
}
}
for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++ j2)
{
s = *j2;
if (s->writeReady() || s->m_Status == SRTS_CLOSED)
{
ws.insert(s->m_SocketID);
++ count;
}
}
for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++ j3)
{
}
if (0 < count)
break;
CGlobEvent::waitForEvent();
} while (timeo > steady_clock::now() - entertime);
if (readfds)
*readfds = rs;
if (writefds)
*writefds = ws;
if (exceptfds)
*exceptfds = es;
return count;
}
int CUDTUnited::selectEx(
const vector<SRTSOCKET>& fds,
vector<SRTSOCKET>* readfds,
vector<SRTSOCKET>* writefds,
vector<SRTSOCKET>* exceptfds,
int64_t msTimeOut)
{
const steady_clock::time_point entertime = steady_clock::now();
const long timeo_us = msTimeOut >= 0
? msTimeOut * 1000
: -1;
const steady_clock::duration timeo(microseconds_from(timeo_us));
int count = 0;
if (readfds)
readfds->clear();
if (writefds)
writefds->clear();
if (exceptfds)
exceptfds->clear();
do
{
for (vector<SRTSOCKET>::const_iterator i = fds.begin();
i != fds.end(); ++ i)
{
CUDTSocket* s = locateSocket(*i);
if ((!s) || s->m_pUDT->m_bBroken || (s->m_Status == SRTS_CLOSED))
{
if (exceptfds)
{
exceptfds->push_back(*i);
++ count;
}
continue;
}
if (readfds)
{
if ((s->m_pUDT->m_bConnected
&& s->m_pUDT->m_pRcvBuffer->isRcvDataReady()
)
|| (s->m_pUDT->m_bListening
&& (s->m_pQueuedSockets->size() > 0)))
{
readfds->push_back(s->m_SocketID);
++ count;
}
}
if (writefds)
{
if (s->m_pUDT->m_bConnected
&& (s->m_pUDT->m_pSndBuffer->getCurrBufSize()
< s->m_pUDT->m_iSndBufSize))
{
writefds->push_back(s->m_SocketID);
++ count;
}
}
}
if (count > 0)
break;
CGlobEvent::waitForEvent();
} while (timeo > steady_clock::now() - entertime);
return count;
}
int CUDTUnited::epoll_create()
{
return m_EPoll.create();
}
int CUDTUnited::epoll_clear_usocks(int eid)
{
return m_EPoll.clear_usocks(eid);
}
int CUDTUnited::epoll_add_usock(
const int eid, const SRTSOCKET u, const int* events)
{
int ret = -1;
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = locateGroup(u);
if (!g)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
ret = m_EPoll.update_usock(eid, u, events);
g->addEPoll(eid);
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (s)
{
ret = m_EPoll.update_usock(eid, u, events);
s->m_pUDT->addEPoll(eid);
}
else
{
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
}
return ret;
}
int CUDTUnited::epoll_add_ssock(
const int eid, const SYSSOCKET s, const int* events)
{
return m_EPoll.add_ssock(eid, s, events);
}
int CUDTUnited::epoll_update_ssock(
const int eid, const SYSSOCKET s, const int* events)
{
return m_EPoll.update_ssock(eid, s, events);
}
template <class EntityType>
int CUDTUnited::epoll_remove_entity(const int eid, EntityType* ent)
{
HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING readiness on E" << eid << " of @" << ent->id());
ent->removeEPollEvents(eid);
HLOGC(ealog.Debug, log << "epoll_remove_usock: REMOVING E" << eid << " from back-subscirbers in @" << ent->id());
ent->removeEPollID(eid);
HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING subscription on E" << eid << " of @" << ent->id());
int no_events = 0;
int ret = m_EPoll.update_usock(eid, ent->id(), &no_events);
return ret;
}
int CUDTUnited::epoll_remove_usock(const int eid, const SRTSOCKET u)
{
CUDTSocket* s = 0;
#if ENABLE_EXPERIMENTAL_BONDING
CUDTGroup* g = 0;
if (u & SRTGROUP_MASK)
{
g = locateGroup(u);
if (g)
return epoll_remove_entity(eid, g);
}
else
#endif
{
s = locateSocket(u);
if (s)
return epoll_remove_entity(eid, s->m_pUDT);
}
LOGC(ealog.Error, log << "remove_usock: @" << u
<< " not found as either socket or group. Removing only from epoll system.");
int no_events = 0;
return m_EPoll.update_usock(eid, u, &no_events);
}
int CUDTUnited::epoll_remove_ssock(const int eid, const SYSSOCKET s)
{
return m_EPoll.remove_ssock(eid, s);
}
int CUDTUnited::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut)
{
return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
}
int32_t CUDTUnited::epoll_set(int eid, int32_t flags)
{
return m_EPoll.setflags(eid, flags);
}
int CUDTUnited::epoll_release(const int eid)
{
return m_EPoll.release(eid);
}
CUDTSocket* CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg (m_GlobControlLock);
sockets_t::iterator i = m_Sockets.find(u);
if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
{
if (erh == ERH_RETURN)
return NULL;
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
}
return i->second;
}
#if ENABLE_EXPERIMENTAL_BONDING
CUDTGroup* CUDTUnited::locateGroup(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg (m_GlobControlLock);
const groups_t::iterator i = m_Groups.find(u);
if ( i == m_Groups.end() )
{
if (erh == ERH_THROW)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return NULL;
}
return i->second;
}
#endif
CUDTSocket* CUDTUnited::locatePeer(
const sockaddr_any& peer,
const SRTSOCKET id,
int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(
CUDTSocket::getPeerSpec(id, isn));
if (i == m_PeerRec.end())
return NULL;
for (set<SRTSOCKET>::iterator j = i->second.begin();
j != i->second.end(); ++ j)
{
sockets_t::iterator k = m_Sockets.find(*j);
if (k == m_Sockets.end())
continue;
if (k->second->m_PeerAddr == peer)
{
return k->second;
}
}
return NULL;
}
void CUDTUnited::checkBrokenSockets()
{
ScopedLock cg(m_GlobControlLock);
vector<SRTSOCKET> tbc;
vector<SRTSOCKET> tbr;
for (sockets_t::iterator i = m_Sockets.begin();
i != m_Sockets.end(); ++ i)
{
CUDTSocket* s = i->second;
if (s->m_pUDT->m_bBroken)
{
if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
{
continue;
}
}
else if ((s->m_pUDT->m_pRcvBuffer != NULL)
&& s->m_pUDT->m_pRcvBuffer->isRcvDataAvailable()
&& (s->m_pUDT->m_iBrokenCounter -- > 0))
{
continue;
}
HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);
s->m_Status = SRTS_CLOSED;
s->m_tsClosureTimeStamp = steady_clock::now();
tbc.push_back(i->first);
m_ClosedSockets[i->first] = s;
sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
if (ls == m_Sockets.end())
{
ls = m_ClosedSockets.find(s->m_ListenSocket);
if (ls == m_ClosedSockets.end())
continue;
}
enterCS(ls->second->m_AcceptLock);
ls->second->m_pQueuedSockets->erase(s->m_SocketID);
ls->second->m_pAcceptSockets->erase(s->m_SocketID);
leaveCS(ls->second->m_AcceptLock);
}
}
for (sockets_t::iterator j = m_ClosedSockets.begin();
j != m_ClosedSockets.end(); ++ j)
{
if (!is_zero(j->second->m_pUDT->m_tsLingerExpiration))
{
if ((!j->second->m_pUDT->m_pSndBuffer)
|| (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize())
|| (j->second->m_pUDT->m_tsLingerExpiration <= steady_clock::now()))
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID);
j->second->m_pUDT->m_tsLingerExpiration = steady_clock::time_point();
j->second->m_pUDT->m_bClosing = true;
j->second->m_tsClosureTimeStamp = steady_clock::now();
}
}
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp;
if ((closed_ago > seconds_from(1))
&& ((!j->second->m_pUDT->m_pRNode)
|| !j->second->m_pUDT->m_pRNode->m_bOnList))
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
<< FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
tbr.push_back(j->first);
}
}
for (vector<SRTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
m_Sockets.erase(*k);
for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
removeSocket(*l);
}
void CUDTUnited::removeSocket(const SRTSOCKET u)
{
sockets_t::iterator i = m_ClosedSockets.find(u);
if (i == m_ClosedSockets.end())
return;
CUDTSocket* const s = i->second;
const int mid = s->m_iMuxID;
if (s->m_pQueuedSockets)
{
ScopedLock cg(s->m_AcceptLock);
for (set<SRTSOCKET>::iterator q = s->m_pQueuedSockets->begin();
q != s->m_pQueuedSockets->end(); ++ q)
{
sockets_t::iterator si = m_Sockets.find(*q);
if (si == m_Sockets.end())
{
LOGC(smlog.Error, log << "removeSocket: IPE? socket @" << u
<< " being queued for listener socket @" << s->m_SocketID
<< " is GONE in the meantime ???");
continue;
}
CUDTSocket* as = si->second;
as->makeClosed();
m_ClosedSockets[*q] = as;
m_Sockets.erase(*q);
}
}
map<int64_t, set<SRTSOCKET> >::iterator j = m_PeerRec.find(
s->getPeerSpec());
if (j != m_PeerRec.end())
{
j->second.erase(u);
if (j->second.empty())
m_PeerRec.erase(j);
}
m_EPoll.update_events(u, s->m_pUDT->m_sPollID,
SRT_EPOLL_IN|SRT_EPOLL_OUT|SRT_EPOLL_ERR, false);
m_ClosedSockets.erase(i);
HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
s->makeClosed();
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
if (mid == -1)
return;
map<int, CMultiplexer>::iterator m;
m = m_mMultiplexer.find(mid);
if (m == m_mMultiplexer.end())
{
LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
return;
}
CMultiplexer& mx = m->second;
mx.m_iRefCount --;
if (0 == mx.m_iRefCount)
{
HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @"
<< u << " - deleting muxer bound to port "
<< mx.m_pChannel->bindAddressAny().hport());
mx.m_pSndQueue->setClosing();
mx.m_pRcvQueue->setClosing();
mx.destroy();
m_mMultiplexer.erase(m);
}
}
void CUDTUnited::updateMux(
CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock )
{
ScopedLock cg(m_GlobControlLock);
if (!udpsock && s->m_pUDT->m_bReuseAddr)
{
const int port = addr.hport();
for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
i != m_mMultiplexer.end(); ++ i)
{
if ((i->second.m_iIPversion == addr.family())
&& (i->second.m_iMSS == s->m_pUDT->m_iMSS)
&& (i->second.m_iIpTTL == s->m_pUDT->m_iIpTTL)
&& (i->second.m_iIpToS == s->m_pUDT->m_iIpToS)
#ifdef SRT_ENABLE_BINDTODEVICE
&& (i->second.m_BindToDevice == s->m_pUDT->m_BindToDevice)
#endif
&& (i->second.m_iIpV6Only == s->m_pUDT->m_iIpV6Only)
&& i->second.m_bReusable)
{
if (i->second.m_iPort == port)
{
++ i->second.m_iRefCount;
s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;
s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;
s->m_iMuxID = i->second.m_iID;
s->m_SelfAddr.family(addr.family());
return;
}
}
}
}
CMultiplexer m;
m.m_iMSS = s->m_pUDT->m_iMSS;
m.m_iIPversion = addr.family();
m.m_iIpTTL = s->m_pUDT->m_iIpTTL;
m.m_iIpToS = s->m_pUDT->m_iIpToS;
#ifdef SRT_ENABLE_BINDTODEVICE
m.m_BindToDevice = s->m_pUDT->m_BindToDevice;
#endif
m.m_iRefCount = 1;
m.m_iIpV6Only = s->m_pUDT->m_iIpV6Only;
m.m_bReusable = s->m_pUDT->m_bReuseAddr;
m.m_iID = s->m_SocketID;
try
{
m.m_pChannel = new CChannel();
m.m_pChannel->setIpTTL(s->m_pUDT->m_iIpTTL);
m.m_pChannel->setIpToS(s->m_pUDT->m_iIpToS);
#ifdef SRT_ENABLE_BINDTODEVICE
m.m_pChannel->setBind(m.m_BindToDevice);
#endif
m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);
if (s->m_pUDT->m_iIpV6Only != -1)
m.m_pChannel->setIpV6Only(s->m_pUDT->m_iIpV6Only);
if (udpsock)
{
m.m_pChannel->attach(*udpsock, addr);
}
else if (addr.empty())
{
m.m_pChannel->open(addr.family());
}
else
{
m.m_pChannel->open(addr);
}
sockaddr_any sa;
m.m_pChannel->getSockAddr((sa));
m.m_iPort = sa.hport();
s->m_SelfAddr = sa;
m.m_pTimer = new CTimer;
m.m_pSndQueue = new CSndQueue;
m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
m.m_pRcvQueue = new CRcvQueue;
m.m_pRcvQueue->init(
32, s->m_pUDT->maxPayloadSize(), m.m_iIPversion, 1024,
m.m_pChannel, m.m_pTimer);
m_mMultiplexer[m.m_iID] = m;
s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
s->m_iMuxID = m.m_iID;
}
catch (CUDTException& e)
{
m.destroy();
throw;
}
catch (...)
{
m.destroy();
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
HLOGF(smlog.Debug,
"creating new multiplexer for port %i\n", m.m_iPort);
}
bool CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
{
ScopedLock cg(m_GlobControlLock);
const int port = ls->m_SelfAddr.hport();
HLOGC(smlog.Debug, log << "updateListenerMux: finding muxer of listener socket @"
<< ls->m_SocketID << " muxid=" << ls->m_iMuxID
<< " bound=" << ls->m_SelfAddr.str()
<< " FOR @" << s->m_SocketID << " addr="
<< s->m_SelfAddr.str() << "_->_" << s->m_PeerAddr.str());
CMultiplexer* mux = map_getp(m_mMultiplexer, ls->m_iMuxID);
CMultiplexer* fallback = NULL;
if (!mux)
{
LOGC(smlog.Error, log << "updateListenerMux: IPE? listener muxer not found by ID, trying by port");
for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
i != m_mMultiplexer.end(); ++ i)
{
CMultiplexer& m = i->second;
#if ENABLE_HEAVY_LOGGING
ostringstream that_muxer;
that_muxer << "id=" << m.m_iID
<< " port=" << m.m_iPort
<< " ip=" << (m.m_iIPversion == AF_INET ? "v4" : "v6");
#endif
if (m.m_iPort == port)
{
HLOGC(smlog.Debug, log << "updateListenerMux: reusing muxer: " << that_muxer.str());
if (m.m_iIPversion == s->m_PeerAddr.family())
{
mux = &m; break;
}
else
{
fallback = &m;
}
}
else
{
HLOGC(smlog.Debug, log << "updateListenerMux: SKIPPING muxer: " << that_muxer.str());
}
}
if (!mux && fallback)
{
if (fallback->m_iIpV6Only == 0)
{
HLOGC(smlog.Warn, log << "updateListenerMux: reusing multiplexer from different family");
mux = fallback;
}
}
}
if (mux)
{
++ mux->m_iRefCount;
s->m_pUDT->m_pSndQueue = mux->m_pSndQueue;
s->m_pUDT->m_pRcvQueue = mux->m_pRcvQueue;
s->m_iMuxID = mux->m_iID;
return true;
}
return false;
}
void* CUDTUnited::garbageCollect(void* p)
{
CUDTUnited* self = (CUDTUnited*)p;
THREAD_STATE_INIT("SRT:GC");
UniqueLock gclock(self->m_GCStopLock);
while (!self->m_bClosing)
{
INCREMENT_THREAD_ITERATIONS();
self->checkBrokenSockets();
HLOGC(inlog.Debug, log << "GC: sleep 1 s");
self->m_GCStopCond.wait_for(gclock, seconds_from(1));
}
HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
enterCS(self->m_GlobControlLock);
for (sockets_t::iterator i = self->m_Sockets.begin();
i != self->m_Sockets.end(); ++ i)
{
i->second->makeClosed();
self->m_ClosedSockets[i->first] = i->second;
sockets_t::iterator ls = self->m_Sockets.find(
i->second->m_ListenSocket);
if (ls == self->m_Sockets.end())
{
ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);
if (ls == self->m_ClosedSockets.end())
continue;
}
enterCS(ls->second->m_AcceptLock);
ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
leaveCS(ls->second->m_AcceptLock);
}
self->m_Sockets.clear();
for (sockets_t::iterator j = self->m_ClosedSockets.begin();
j != self->m_ClosedSockets.end(); ++ j)
{
j->second->m_tsClosureTimeStamp = steady_clock::time_point();
}
leaveCS(self->m_GlobControlLock);
HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
while (true)
{
self->checkBrokenSockets();
enterCS(self->m_GlobControlLock);
bool empty = self->m_ClosedSockets.empty();
leaveCS(self->m_GlobControlLock);
if (empty)
break;
srt::sync::this_thread::sleep_for(milliseconds_from(1));
}
THREAD_EXIT();
return NULL;
}
int CUDT::startup()
{
return s_UDTUnited.startup();
}
int CUDT::cleanup()
{
return s_UDTUnited.cleanup();
}
SRTSOCKET CUDT::socket()
{
if (!s_UDTUnited.m_bGCStatus)
s_UDTUnited.startup();
try
{
return s_UDTUnited.newSocket();
}
catch (const CUDTException& e)
{
SetThreadLocalError(e);
return INVALID_SOCK;
}
catch (const bad_alloc&)
{
SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
return INVALID_SOCK;
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "socket: UNEXPECTED EXCEPTION: "
<< typeid(ee).name()
<< ": " << ee.what());
SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return INVALID_SOCK;
}
}
CUDT::APIError::APIError(const CUDTException& e)
{
SetThreadLocalError(e);
}
CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr)
{
SetThreadLocalError(CUDTException(mj, mn, syserr));
}
#if ENABLE_EXPERIMENTAL_BONDING
CUDTGroup& CUDT::newGroup(const int type)
{
const SRTSOCKET id = s_UDTUnited.generateSocketID(true);
return s_UDTUnited.addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
}
SRTSOCKET CUDT::createGroup(SRT_GROUP_TYPE gt)
{
if (!s_UDTUnited.m_bGCStatus)
s_UDTUnited.startup();
try
{
return newGroup(gt).id();
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (...)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
return SRT_INVALID_SOCK;
}
int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group)
{
int32_t sid = socket & ~SRTGROUP_MASK;
int32_t gm = group & SRTGROUP_MASK;
if ( sid != socket || gm == 0 )
return APIError(MJ_NOTSUP, MN_INVAL, 0);
CUDTSocket* s = s_UDTUnited.locateSocket(socket);
CUDTGroup* g = s_UDTUnited.locateGroup(group);
if (!s || !g)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
if (s->m_IncludedGroup)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
if (g->managed())
{
if (!g->empty())
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
g->set_managed(false);
}
ScopedLock cg (s->m_ControlLock);
CUDTGroup::gli_t f = g->find(socket);
if (f != CUDTGroup::gli_NULL())
{
LOGC(aclog.Error, log << "IPE (non-fatal): the socket is in the group, but has no clue about it!");
s->m_IncludedGroup = g;
s->m_IncludedIter = f;
return 0;
}
s->m_IncludedGroup = g;
s->m_IncludedIter = g->add(g->prepareData(s));
return 0;
}
int CUDT::removeSocketFromGroup(SRTSOCKET socket)
{
CUDTSocket* s = s_UDTUnited.locateSocket(socket);
if (!s)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
if (!s->m_IncludedGroup)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
ScopedLock grd (s->m_ControlLock);
s->removeFromGroup(false);
return 0;
}
void CUDTSocket::removeFromGroup(bool broken)
{
m_IncludedGroup->remove(m_SocketID);
if (broken)
{
m_IncludedGroup->activateUpdateEvent();
}
m_IncludedIter = CUDTGroup::gli_NULL();
m_IncludedGroup = NULL;
}
SRTSOCKET CUDT::getGroupOfSocket(SRTSOCKET socket)
{
CUDTSocket* s = s_UDTUnited.locateSocket(socket);
if (!s)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
if (!s->m_IncludedGroup)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
return s->m_IncludedGroup->id();
}
int CUDT::configureGroup(SRTSOCKET groupid, const char* str)
{
if ( (groupid & SRTGROUP_MASK) == 0)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
CUDTGroup* g = s_UDTUnited.locateGroup(groupid, s_UDTUnited.ERH_RETURN);
if (!g)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
return g->configure(str);
}
int CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psize)
{
if ((groupid & SRTGROUP_MASK) == 0 || !psize)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
CUDTGroup* g = s_UDTUnited.locateGroup(groupid, s_UDTUnited.ERH_RETURN);
if (!g)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
return g->getGroupData(pdata, psize);
}
#endif
int CUDT::bind(SRTSOCKET u, const sockaddr* name, int namelen)
{
try
{
sockaddr_any sa (name, namelen);
if (sa.len == 0)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
CUDTSocket* s = s_UDTUnited.locateSocket(u);
if (!s)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
return s_UDTUnited.bind(s, sa);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "bind: UNEXPECTED EXCEPTION: "
<< typeid(ee).name()
<< ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::bind(SRTSOCKET u, UDPSOCKET udpsock)
{
try
{
CUDTSocket* s = s_UDTUnited.locateSocket(u);
if (!s)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
return s_UDTUnited.bind(s, udpsock);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "bind/udp: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::listen(SRTSOCKET u, int backlog)
{
try
{
return s_UDTUnited.listen(u, backlog);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "listen: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
SRTSOCKET CUDT::accept_bond(const SRTSOCKET listeners [], int lsize, int64_t msTimeOut)
{
try
{
return s_UDTUnited.accept_bond(listeners, lsize, msTimeOut);
}
catch (const CUDTException& e)
{
SetThreadLocalError(e);
return INVALID_SOCK;
}
catch (bad_alloc&)
{
SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
return INVALID_SOCK;
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "accept_bond: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return INVALID_SOCK;
}
}
SRTSOCKET CUDT::accept(SRTSOCKET u, sockaddr* addr, int* addrlen)
{
try
{
return s_UDTUnited.accept(u, addr, addrlen);
}
catch (const CUDTException& e)
{
SetThreadLocalError(e);
return INVALID_SOCK;
}
catch (const bad_alloc&)
{
SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
return INVALID_SOCK;
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "accept: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return INVALID_SOCK;
}
}
int CUDT::connect(
SRTSOCKET u, const sockaddr* name, const sockaddr* tname, int namelen)
{
try
{
return s_UDTUnited.connect(u, name, tname, namelen);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (std::exception& ee)
{
LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
#if ENABLE_EXPERIMENTAL_BONDING
int CUDT::connectLinks(SRTSOCKET grp,
SRT_SOCKGROUPCONFIG targets [], int arraysize)
{
if (arraysize <= 0)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
if ( (grp & SRTGROUP_MASK) == 0)
{
return APIError(MJ_NOTSUP, MN_SIDINVAL, 0);
}
try
{
return s_UDTUnited.groupConnect(
s_UDTUnited.locateGroup(grp, s_UDTUnited.ERH_THROW),
targets, arraysize);
}
catch (CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (std::exception& ee)
{
LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
#endif
int CUDT::connect(
SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
{
try
{
return s_UDTUnited.connect(u, name, namelen, forced_isn);
}
catch (const CUDTException &e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::close(SRTSOCKET u)
{
try
{
return s_UDTUnited.close(u);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "close: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::getpeername(SRTSOCKET u, sockaddr* name, int* namelen)
{
try
{
s_UDTUnited.getpeername(u, name, namelen);
return 0;
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "getpeername: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::getsockname(SRTSOCKET u, sockaddr* name, int* namelen)
{
try
{
s_UDTUnited.getsockname(u, name, namelen);
return 0;
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "getsockname: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::getsockopt(
SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optval, int* pw_optlen)
{
if (!pw_optval || !pw_optlen)
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
try
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = s_UDTUnited.locateGroup(u, s_UDTUnited.ERH_THROW);
g->getOpt(optname, (pw_optval), (*pw_optlen));
return 0;
}
#endif
CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT;
udt->getOpt(optname, (pw_optval), (*pw_optlen));
return 0;
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "getsockopt: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* optval, int optlen)
{
if (!optval)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
try
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
CUDTGroup* g = s_UDTUnited.locateGroup(u, s_UDTUnited.ERH_THROW);
g->setOpt(optname, optval, optlen);
return 0;
}
#endif
CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT;
udt->setOpt(optname, optval, optlen);
return 0;
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "setsockopt: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::send(SRTSOCKET u, const char* buf, int len, int)
{
SRT_MSGCTRL mctrl = srt_msgctrl_default;
return sendmsg2(u, buf, len, (mctrl));
}
int CUDT::sendmsg(
SRTSOCKET u, const char* buf, int len, int ttl, bool inorder,
int64_t srctime)
{
SRT_MSGCTRL mctrl = srt_msgctrl_default;
mctrl.msgttl = ttl;
mctrl.inorder = inorder;
mctrl.srctime = srctime;
return sendmsg2(u, buf, len, (mctrl));
}
int CUDT::sendmsg2(
SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
{
try
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
return s_UDTUnited.locateGroup(u, CUDTUnited::ERH_THROW)->send(buf, len, (w_m));
}
#endif
return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "sendmsg: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::recv(SRTSOCKET u, char* buf, int len, int)
{
SRT_MSGCTRL mctrl = srt_msgctrl_default;
int ret = recvmsg2(u, buf, len, (mctrl));
return ret;
}
int CUDT::recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
{
SRT_MSGCTRL mctrl = srt_msgctrl_default;
int ret = recvmsg2(u, buf, len, (mctrl));
srctime = mctrl.srctime;
return ret;
}
int CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
{
try
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
{
return s_UDTUnited.locateGroup(u, CUDTUnited::ERH_THROW)->recv(buf, len, (w_m));
}
#endif
return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "recvmsg: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int64_t CUDT::sendfile(
SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
{
try
{
CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT;
return udt->sendfile(ifs, offset, size, block);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "sendfile: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int64_t CUDT::recvfile(
SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
{
try
{
return s_UDTUnited.locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "recvfile: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::select(
int,
UDT::UDSET* readfds,
UDT::UDSET* writefds,
UDT::UDSET* exceptfds,
const timeval* timeout)
{
if ((!readfds) && (!writefds) && (!exceptfds))
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
try
{
return s_UDTUnited.select(readfds, writefds, exceptfds, timeout);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "select: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::selectEx(
const vector<SRTSOCKET>& fds,
vector<SRTSOCKET>* readfds,
vector<SRTSOCKET>* writefds,
vector<SRTSOCKET>* exceptfds,
int64_t msTimeOut)
{
if ((!readfds) && (!writefds) && (!exceptfds))
{
return APIError(MJ_NOTSUP, MN_INVAL, 0);
}
try
{
return s_UDTUnited.selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (bad_alloc&)
{
return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "selectEx: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN);
}
}
int CUDT::epoll_create()
{
try
{
return s_UDTUnited.epoll_create();
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_create: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_clear_usocks(int eid)
{
try
{
return s_UDTUnited.epoll_clear_usocks(eid);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_clear_usocks: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
{
try
{
return s_UDTUnited.epoll_add_usock(eid, u, events);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_add_usock: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
{
try
{
return s_UDTUnited.epoll_add_ssock(eid, s, events);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_add_ssock: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_update_usock(
const int eid, const SRTSOCKET u, const int* events)
{
try
{
return s_UDTUnited.epoll_add_usock(eid, u, events);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_update_usock: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_update_ssock(
const int eid, const SYSSOCKET s, const int* events)
{
try
{
return s_UDTUnited.epoll_update_ssock(eid, s, events);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_update_ssock: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_remove_usock(const int eid, const SRTSOCKET u)
{
try
{
return s_UDTUnited.epoll_remove_usock(eid, u);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_remove_usock: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s)
{
try
{
return s_UDTUnited.epoll_remove_ssock(eid, s);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_remove_ssock: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_wait(
const int eid,
set<SRTSOCKET>* readfds,
set<SRTSOCKET>* writefds,
int64_t msTimeOut,
set<SYSSOCKET>* lrfds,
set<SYSSOCKET>* lwfds)
{
try
{
return s_UDTUnited.epoll_ref().wait(
eid, readfds, writefds, msTimeOut, lrfds, lwfds);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_wait: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut)
{
try
{
return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int32_t CUDT::epoll_set(
const int eid,
int32_t flags)
{
try
{
return s_UDTUnited.epoll_set(eid, flags);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
int CUDT::epoll_release(const int eid)
{
try
{
return s_UDTUnited.epoll_release(eid);
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "epoll_release: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
CUDTException& CUDT::getlasterror()
{
return GetThreadLocalError();
}
int CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instantaneous)
{
#if ENABLE_EXPERIMENTAL_BONDING
if (u & SRTGROUP_MASK)
return groupsockbstats(u, perf, clear);
#endif
try
{
CUDT* udt = s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT;
udt->bstats(perf, clear, instantaneous);
return 0;
}
catch (const CUDTException& e)
{
return APIError(e);
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
return APIError(MJ_UNKNOWN, MN_NONE, 0);
}
}
#if ENABLE_EXPERIMENTAL_BONDING
int CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
{
try
{
CUDTGroup* g = s_UDTUnited.locateGroup(u, s_UDTUnited.ERH_THROW);
g->bstatsSocket(perf, clear);
return 0;
}
catch (const CUDTException& e)
{
SetThreadLocalError(e);
return ERROR;
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return ERROR;
}
}
#endif
CUDT* CUDT::getUDTHandle(SRTSOCKET u)
{
try
{
return s_UDTUnited.locateSocket(u, s_UDTUnited.ERH_THROW)->m_pUDT;
}
catch (const CUDTException& e)
{
SetThreadLocalError(e);
return NULL;
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "getUDTHandle: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return NULL;
}
}
vector<SRTSOCKET> CUDT::existingSockets()
{
vector<SRTSOCKET> out;
for (CUDTUnited::sockets_t::iterator i = s_UDTUnited.m_Sockets.begin();
i != s_UDTUnited.m_Sockets.end(); ++i)
{
out.push_back(i->first);
}
return out;
}
SRT_SOCKSTATUS CUDT::getsockstate(SRTSOCKET u)
{
try
{
#if ENABLE_EXPERIMENTAL_BONDING
if (isgroup(u))
{
CUDTGroup* g = s_UDTUnited.locateGroup(u, s_UDTUnited.ERH_THROW);
return g->getStatus();
}
#endif
return s_UDTUnited.getStatus(u);
}
catch (const CUDTException& e)
{
SetThreadLocalError(e);
return SRTS_NONEXIST;
}
catch (const std::exception& ee)
{
LOGC(aclog.Fatal, log << "getsockstate: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return SRTS_NONEXIST;
}
}
namespace UDT
{
int startup()
{
return CUDT::startup();
}
int cleanup()
{
return CUDT::cleanup();
}
int bind(SRTSOCKET u, const struct sockaddr* name, int namelen)
{
return CUDT::bind(u, name, namelen);
}
int bind2(SRTSOCKET u, UDPSOCKET udpsock)
{
return CUDT::bind(u, udpsock);
}
int listen(SRTSOCKET u, int backlog)
{
return CUDT::listen(u, backlog);
}
SRTSOCKET accept(SRTSOCKET u, struct sockaddr* addr, int* addrlen)
{
return CUDT::accept(u, addr, addrlen);
}
int connect(SRTSOCKET u, const struct sockaddr* name, int namelen)
{
return CUDT::connect(u, name, namelen, SRT_SEQNO_NONE);
}
int close(SRTSOCKET u)
{
return CUDT::close(u);
}
int getpeername(SRTSOCKET u, struct sockaddr* name, int* namelen)
{
return CUDT::getpeername(u, name, namelen);
}
int getsockname(SRTSOCKET u, struct sockaddr* name, int* namelen)
{
return CUDT::getsockname(u, name, namelen);
}
int getsockopt(
SRTSOCKET u, int level, SRT_SOCKOPT optname, void* optval, int* optlen)
{
return CUDT::getsockopt(u, level, optname, optval, optlen);
}
int setsockopt(
SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen)
{
return CUDT::setsockopt(u, level, optname, optval, optlen);
}
int connect_debug(
SRTSOCKET u, const struct sockaddr* name, int namelen, int32_t forced_isn)
{
return CUDT::connect(u, name, namelen, forced_isn);
}
int send(SRTSOCKET u, const char* buf, int len, int flags)
{
return CUDT::send(u, buf, len, flags);
}
int recv(SRTSOCKET u, char* buf, int len, int flags)
{
return CUDT::recv(u, buf, len, flags);
}
int sendmsg(
SRTSOCKET u, const char* buf, int len, int ttl, bool inorder,
int64_t srctime)
{
return CUDT::sendmsg(u, buf, len, ttl, inorder, srctime);
}
int recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
{
return CUDT::recvmsg(u, buf, len, srctime);
}
int recvmsg(SRTSOCKET u, char* buf, int len)
{
int64_t srctime;
return CUDT::recvmsg(u, buf, len, srctime);
}
int64_t sendfile(
SRTSOCKET u,
fstream& ifs,
int64_t& offset,
int64_t size,
int block)
{
return CUDT::sendfile(u, ifs, offset, size, block);
}
int64_t recvfile(
SRTSOCKET u,
fstream& ofs,
int64_t& offset,
int64_t size,
int block)
{
return CUDT::recvfile(u, ofs, offset, size, block);
}
int64_t sendfile2(
SRTSOCKET u,
const char* path,
int64_t* offset,
int64_t size,
int block)
{
fstream ifs(path, ios::binary | ios::in);
int64_t ret = CUDT::sendfile(u, ifs, *offset, size, block);
ifs.close();
return ret;
}
int64_t recvfile2(
SRTSOCKET u,
const char* path,
int64_t* offset,
int64_t size,
int block)
{
fstream ofs(path, ios::binary | ios::out);
int64_t ret = CUDT::recvfile(u, ofs, *offset, size, block);
ofs.close();
return ret;
}
int select(
int nfds,
UDSET* readfds,
UDSET* writefds,
UDSET* exceptfds,
const struct timeval* timeout)
{
return CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
}
int selectEx(
const vector<SRTSOCKET>& fds,
vector<SRTSOCKET>* readfds,
vector<SRTSOCKET>* writefds,
vector<SRTSOCKET>* exceptfds,
int64_t msTimeOut)
{
return CUDT::selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
}
int epoll_create()
{
return CUDT::epoll_create();
}
int epoll_clear_usocks(int eid)
{
return CUDT::epoll_clear_usocks(eid);
}
int epoll_add_usock(int eid, SRTSOCKET u, const int* events)
{
return CUDT::epoll_add_usock(eid, u, events);
}
int epoll_add_ssock(int eid, SYSSOCKET s, const int* events)
{
return CUDT::epoll_add_ssock(eid, s, events);
}
int epoll_update_usock(int eid, SRTSOCKET u, const int* events)
{
return CUDT::epoll_update_usock(eid, u, events);
}
int epoll_update_ssock(int eid, SYSSOCKET s, const int* events)
{
return CUDT::epoll_update_ssock(eid, s, events);
}
int epoll_remove_usock(int eid, SRTSOCKET u)
{
return CUDT::epoll_remove_usock(eid, u);
}
int epoll_remove_ssock(int eid, SYSSOCKET s)
{
return CUDT::epoll_remove_ssock(eid, s);
}
int epoll_wait(
int eid,
set<SRTSOCKET>* readfds,
set<SRTSOCKET>* writefds,
int64_t msTimeOut,
set<SYSSOCKET>* lrfds,
set<SYSSOCKET>* lwfds)
{
return CUDT::epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
}
template <class SOCKTYPE>
inline void set_result(set<SOCKTYPE>* val, int* num, SOCKTYPE* fds)
{
if ( !val || !num || !fds )
return;
if (*num > int(val->size()))
*num = int(val->size()); int count = 0;
for (typename set<SOCKTYPE>::const_iterator it = val->begin(); it != val->end(); ++ it)
{
if (count >= *num)
break;
fds[count ++] = *it;
}
}
int epoll_wait2(
int eid, SRTSOCKET* readfds,
int* rnum, SRTSOCKET* writefds,
int* wnum,
int64_t msTimeOut,
SYSSOCKET* lrfds,
int* lrnum,
SYSSOCKET* lwfds,
int* lwnum)
{
set<SRTSOCKET> readset;
set<SRTSOCKET> writeset;
set<SYSSOCKET> lrset;
set<SYSSOCKET> lwset;
set<SRTSOCKET>* rval = NULL;
set<SRTSOCKET>* wval = NULL;
set<SYSSOCKET>* lrval = NULL;
set<SYSSOCKET>* lwval = NULL;
if ((readfds != NULL) && (rnum != NULL))
rval = &readset;
if ((writefds != NULL) && (wnum != NULL))
wval = &writeset;
if ((lrfds != NULL) && (lrnum != NULL))
lrval = &lrset;
if ((lwfds != NULL) && (lwnum != NULL))
lwval = &lwset;
int ret = CUDT::epoll_wait(eid, rval, wval, msTimeOut, lrval, lwval);
if (ret > 0)
{
set_result(rval, rnum, readfds);
set_result(wval, wnum, writefds);
set_result(lrval, lrnum, lrfds);
set_result(lwval, lwnum, lwfds);
}
return ret;
}
int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
return CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
}
int epoll_release(int eid)
{
return CUDT::epoll_release(eid);
}
ERRORINFO& getlasterror()
{
return CUDT::getlasterror();
}
int getlasterror_code()
{
return CUDT::getlasterror().getErrorCode();
}
const char* getlasterror_desc()
{
return CUDT::getlasterror().getErrorMessage();
}
int getlasterror_errno()
{
return CUDT::getlasterror().getErrno();
}
const char* geterror_desc(int code, int err)
{
CUDTException e (CodeMajor(code/1000), CodeMinor(code%1000), err);
return(e.getErrorMessage());
}
int bstats(SRTSOCKET u, SRT_TRACEBSTATS* perf, bool clear)
{
return CUDT::bstats(u, perf, clear);
}
SRT_SOCKSTATUS getsockstate(SRTSOCKET u)
{
return CUDT::getsockstate(u);
}
}
namespace srt
{
void setloglevel(LogLevel::type ll)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.max_level = ll;
}
void addlogfa(LogFA fa)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.enabled_fa.set(fa, true);
}
void dellogfa(LogFA fa)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.enabled_fa.set(fa, false);
}
void resetlogfa(set<LogFA> fas)
{
ScopedLock gg(srt_logger_config.mutex);
for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
srt_logger_config.enabled_fa.set(i, fas.count(i));
}
void resetlogfa(const int* fara, size_t fara_size)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.enabled_fa.reset();
for (const int* i = fara; i != fara + fara_size; ++i)
srt_logger_config.enabled_fa.set(*i, true);
}
void setlogstream(std::ostream& stream)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.log_stream = &stream;
}
void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.loghandler_opaque = opaque;
srt_logger_config.loghandler_fn = handler;
}
void setlogflags(int flags)
{
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.flags = flags;
}
SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid)
{
return CUDT::setstreamid(u, sid);
}
SRT_API std::string getstreamid(SRTSOCKET u)
{
return CUDT::getstreamid(u);
}
int getrejectreason(SRTSOCKET u)
{
return CUDT::rejectReason(u);
}
int setrejectreason(SRTSOCKET u, int value)
{
return CUDT::rejectReason(u, value);
}
}