#include "platform_sys.h"
#include <cstring>
#include "common.h"
#include "api.h"
#include "netinet_any.h"
#include "threadname.h"
#include "logging.h"
#include "queue.h"
using namespace std;
using namespace srt::sync;
using namespace srt_logging;
CUnitQueue::CUnitQueue()
: m_pQEntry(NULL)
, m_pCurrQueue(NULL)
, m_pLastQueue(NULL)
, m_iSize(0)
, m_iCount(0)
, m_iMSS()
, m_iIPversion()
{
}
CUnitQueue::~CUnitQueue()
{
CQEntry *p = m_pQEntry;
while (p != NULL)
{
delete[] p->m_pUnit;
delete[] p->m_pBuffer;
CQEntry *q = p;
if (p == m_pLastQueue)
p = NULL;
else
p = p->m_pNext;
delete q;
}
}
int CUnitQueue::init(int size, int mss, int version)
{
CQEntry *tempq = NULL;
CUnit * tempu = NULL;
char * tempb = NULL;
try
{
tempq = new CQEntry;
tempu = new CUnit[size];
tempb = new char[size * mss];
}
catch (...)
{
delete tempq;
delete[] tempu;
delete[] tempb;
return -1;
}
for (int i = 0; i < size; ++i)
{
tempu[i].m_iFlag = CUnit::FREE;
tempu[i].m_Packet.m_pcData = tempb + i * mss;
}
tempq->m_pUnit = tempu;
tempq->m_pBuffer = tempb;
tempq->m_iSize = size;
m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
m_pQEntry->m_pNext = m_pQEntry;
m_pAvailUnit = m_pCurrQueue->m_pUnit;
m_iSize = size;
m_iMSS = mss;
m_iIPversion = version;
return 0;
}
int CUnitQueue::increase()
{
int real_count = 0;
CQEntry *p = m_pQEntry;
while (p != NULL)
{
CUnit *u = p->m_pUnit;
for (CUnit *end = u + p->m_iSize; u != end; ++u)
if (u->m_iFlag != CUnit::FREE)
++real_count;
if (p == m_pLastQueue)
p = NULL;
else
p = p->m_pNext;
}
m_iCount = real_count;
if (double(m_iCount) / m_iSize < 0.9)
return -1;
CQEntry *tempq = NULL;
CUnit * tempu = NULL;
char * tempb = NULL;
const int size = m_pQEntry->m_iSize;
try
{
tempq = new CQEntry;
tempu = new CUnit[size];
tempb = new char[size * m_iMSS];
}
catch (...)
{
delete tempq;
delete[] tempu;
delete[] tempb;
LOGC(rslog.Error,
log << "CUnitQueue:increase: failed to allocate " << size << " new units."
<< " Current size=" << m_iSize);
return -1;
}
for (int i = 0; i < size; ++i)
{
tempu[i].m_iFlag = CUnit::FREE;
tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
}
tempq->m_pUnit = tempu;
tempq->m_pBuffer = tempb;
tempq->m_iSize = size;
m_pLastQueue->m_pNext = tempq;
m_pLastQueue = tempq;
m_pLastQueue->m_pNext = m_pQEntry;
m_iSize += size;
return 0;
}
int CUnitQueue::shrink()
{
return -1;
}
CUnit *CUnitQueue::getNextAvailUnit()
{
if (m_iCount * 10 > m_iSize * 9)
increase();
if (m_iCount >= m_iSize)
return NULL;
int units_checked = 0;
do
{
const CUnit *end = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize;
for (; m_pAvailUnit != end; ++m_pAvailUnit, ++units_checked)
{
if (m_pAvailUnit->m_iFlag == CUnit::FREE)
{
return m_pAvailUnit;
}
}
m_pCurrQueue = m_pCurrQueue->m_pNext;
m_pAvailUnit = m_pCurrQueue->m_pUnit;
} while (units_checked < m_iSize);
increase();
return NULL;
}
void CUnitQueue::makeUnitFree(CUnit *unit)
{
SRT_ASSERT(unit != NULL);
SRT_ASSERT(unit->m_iFlag != CUnit::FREE);
unit->m_iFlag = CUnit::FREE;
--m_iCount;
}
void CUnitQueue::makeUnitGood(CUnit *unit)
{
SRT_ASSERT(unit != NULL);
SRT_ASSERT(unit->m_iFlag == CUnit::FREE);
unit->m_iFlag = CUnit::GOOD;
++m_iCount;
}
CSndUList::CSndUList()
: m_pHeap(NULL)
, m_iArrayLength(512)
, m_iLastEntry(-1)
, m_ListLock()
, m_pWindowLock(NULL)
, m_pWindowCond(NULL)
, m_pTimer(NULL)
{
m_pHeap = new CSNode *[m_iArrayLength];
}
CSndUList::~CSndUList()
{
delete[] m_pHeap;
}
void CSndUList::update(const CUDT* u, EReschedule reschedule)
{
ScopedLock listguard(m_ListLock);
CSNode* n = u->m_pSNode;
if (n->m_iHeapLoc >= 0)
{
if (!reschedule) return;
if (n->m_iHeapLoc == 0)
{
n->m_tsTimeStamp = steady_clock::now();
m_pTimer->interrupt();
return;
}
remove_(u);
insert_norealloc_(steady_clock::now(), u);
return;
}
insert_(steady_clock::now(), u);
}
int CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt)
{
ScopedLock listguard(m_ListLock);
if (-1 == m_iLastEntry)
return -1;
if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
return -1;
CUDT *u = m_pHeap[0]->m_pUDT;
remove_(u);
#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
HLOGC(qslog.Debug,
log << "SND:pop: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
<< UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
<< UST(Opened));
#undef UST
if (!u->m_bConnected || u->m_bBroken)
return -1;
const std::pair<int, steady_clock::time_point> res_time = u->packData((w_pkt));
if (res_time.first <= 0)
return -1;
w_addr = u->m_PeerAddr;
const steady_clock::time_point send_time = res_time.second;
if (!is_zero(send_time))
insert_norealloc_(send_time, u);
return 1;
}
void CSndUList::remove(const CUDT *u)
{
ScopedLock listguard(m_ListLock);
remove_(u);
}
steady_clock::time_point CSndUList::getNextProcTime()
{
ScopedLock listguard(m_ListLock);
if (-1 == m_iLastEntry)
return steady_clock::time_point();
return m_pHeap[0]->m_tsTimeStamp;
}
void CSndUList::realloc_()
{
CSNode **temp = NULL;
try
{
temp = new CSNode *[2 * m_iArrayLength];
}
catch (...)
{
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
memcpy((temp), m_pHeap, sizeof(CSNode *) * m_iArrayLength);
m_iArrayLength *= 2;
delete[] m_pHeap;
m_pHeap = temp;
}
void CSndUList::insert_(const steady_clock::time_point& ts, const CUDT* u)
{
if (m_iLastEntry == m_iArrayLength - 1)
realloc_();
insert_norealloc_(ts, u);
}
void CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT* u)
{
CSNode *n = u->m_pSNode;
if (n->m_iHeapLoc >= 0)
return;
SRT_ASSERT(m_iLastEntry < m_iArrayLength);
m_iLastEntry++;
m_pHeap[m_iLastEntry] = n;
n->m_tsTimeStamp = ts;
int q = m_iLastEntry;
int p = q;
while (p != 0)
{
p = (q - 1) >> 1;
if (m_pHeap[p]->m_tsTimeStamp <= m_pHeap[q]->m_tsTimeStamp)
break;
swap(m_pHeap[p], m_pHeap[q]);
m_pHeap[q]->m_iHeapLoc = q;
q = p;
}
n->m_iHeapLoc = q;
if (n->m_iHeapLoc == 0)
m_pTimer->interrupt();
if (0 == m_iLastEntry)
{
CSync::lock_signal(*m_pWindowCond, *m_pWindowLock);
}
}
void CSndUList::remove_(const CUDT* u)
{
CSNode *n = u->m_pSNode;
if (n->m_iHeapLoc >= 0)
{
m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
m_iLastEntry--;
m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
int q = n->m_iHeapLoc;
int p = q * 2 + 1;
while (p <= m_iLastEntry)
{
if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_tsTimeStamp > m_pHeap[p + 1]->m_tsTimeStamp))
p++;
if (m_pHeap[q]->m_tsTimeStamp > m_pHeap[p]->m_tsTimeStamp)
{
swap(m_pHeap[p], m_pHeap[q]);
m_pHeap[p]->m_iHeapLoc = p;
m_pHeap[q]->m_iHeapLoc = q;
q = p;
p = q * 2 + 1;
}
else
break;
}
n->m_iHeapLoc = -1;
}
if (0 == m_iLastEntry)
m_pTimer->interrupt();
}
CSndQueue::CSndQueue()
: m_pSndUList(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_WindowCond()
, m_bClosing(false)
{
setupCond(m_WindowCond, "Window");
}
CSndQueue::~CSndQueue()
{
m_bClosing = true;
if (m_pTimer != NULL)
{
m_pTimer->interrupt();
}
CSync::lock_signal(m_WindowCond, m_WindowLock);
if (m_WorkerThread.joinable())
{
HLOGC(rslog.Debug, log << "SndQueue: EXIT");
m_WorkerThread.join();
}
releaseCond(m_WindowCond);
delete m_pSndUList;
}
#if ENABLE_LOGGING
int CSndQueue::m_counter = 0;
#endif
void CSndQueue::init(CChannel *c, CTimer *t)
{
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList;
m_pSndUList->m_pWindowLock = &m_WindowLock;
m_pSndUList->m_pWindowCond = &m_WindowCond;
m_pSndUList->m_pTimer = m_pTimer;
#if ENABLE_LOGGING
++m_counter;
const std::string thrname = "SRT:SndQ:w" + Sprint(m_counter);
const char* thname = thrname.c_str();
#else
const char* thname = "SRT:SndQ";
#endif
if (!StartThread(m_WorkerThread, CSndQueue::worker, this, thname))
throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
}
int CSndQueue::getIpTTL() const { return m_pChannel ? m_pChannel->getIpTTL() : -1; }
int CSndQueue::getIpToS() const { return m_pChannel ? m_pChannel->getIpToS() : -1; }
#ifdef SRT_ENABLE_BINDTODEVICE
bool CSndQueue::getBind(char* dst, size_t len) const { return m_pChannel ? m_pChannel->getBind(dst, len) : false; }
#endif
void *CSndQueue::worker(void *param)
{
CSndQueue *self = (CSndQueue *)param;
THREAD_STATE_INIT("SRT:SndQ:worker");
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
CTimer::rdtsc(self->m_ullDbgTime);
self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency();
self->m_ullDbgTime += self->m_ullDbgPeriod;
#endif
while (!self->m_bClosing)
{
const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime();
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lIteration++;
#endif
if (is_zero(next_time))
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyTs++;
#endif
UniqueLock windlock (self->m_WindowLock);
CSync windsync (self->m_WindowCond, windlock);
THREAD_PAUSED();
if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
{
windsync.wait();
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lCondWait++;
#endif
}
THREAD_RESUMED();
continue;
}
const steady_clock::time_point currtime = steady_clock::now();
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
if (self->m_ullDbgTime <= currtime)
{
fprintf(stdout,
"SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
self->m_WorkerStats.lIteration,
self->m_WorkerStats.lSleepTo,
self->m_WorkerStats.lNotReadyPop,
self->m_WorkerStats.lSendTo,
self->m_WorkerStats.lNotReadyTs,
self->m_WorkerStats.lCondWait);
memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
}
#endif
THREAD_PAUSED();
if (currtime < next_time)
{
self->m_pTimer->sleep_until(next_time);
#if defined(HAI_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lSleepTo++;
#endif
}
THREAD_RESUMED();
sockaddr_any addr;
CPacket pkt;
if (self->m_pSndUList->pop((addr), (pkt)) < 0)
{
continue;
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif
}
HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
self->m_pChannel->sendto(addr, pkt);
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lSendTo++;
#endif
}
THREAD_EXIT();
return NULL;
}
int CSndQueue::sendto(const sockaddr_any& w_addr, CPacket& w_packet)
{
m_pChannel->sendto(w_addr, w_packet);
return (int)w_packet.getLength();
}
CRcvUList::CRcvUList()
: m_pUList(NULL)
, m_pLast(NULL)
{
}
CRcvUList::~CRcvUList() {}
void CRcvUList::insert(const CUDT *u)
{
CRNode *n = u->m_pRNode;
n->m_tsTimeStamp = steady_clock::now();
if (NULL == m_pUList)
{
n->m_pPrev = n->m_pNext = NULL;
m_pLast = m_pUList = n;
return;
}
n->m_pPrev = m_pLast;
n->m_pNext = NULL;
m_pLast->m_pNext = n;
m_pLast = n;
}
void CRcvUList::remove(const CUDT *u)
{
CRNode *n = u->m_pRNode;
if (!n->m_bOnList)
return;
if (NULL == n->m_pPrev)
{
m_pUList = n->m_pNext;
if (NULL == m_pUList)
m_pLast = NULL;
else
m_pUList->m_pPrev = NULL;
}
else
{
n->m_pPrev->m_pNext = n->m_pNext;
if (NULL == n->m_pNext)
{
m_pLast = n->m_pPrev;
}
else
n->m_pNext->m_pPrev = n->m_pPrev;
}
n->m_pNext = n->m_pPrev = NULL;
}
void CRcvUList::update(const CUDT *u)
{
CRNode *n = u->m_pRNode;
if (!n->m_bOnList)
return;
n->m_tsTimeStamp = steady_clock::now();
if (NULL == n->m_pNext)
return;
if (NULL == n->m_pPrev)
{
m_pUList = n->m_pNext;
m_pUList->m_pPrev = NULL;
}
else
{
n->m_pPrev->m_pNext = n->m_pNext;
n->m_pNext->m_pPrev = n->m_pPrev;
}
n->m_pPrev = m_pLast;
n->m_pNext = NULL;
m_pLast->m_pNext = n;
m_pLast = n;
}
CHash::CHash()
: m_pBucket(NULL)
, m_iHashSize(0)
{
}
CHash::~CHash()
{
for (int i = 0; i < m_iHashSize; ++i)
{
CBucket *b = m_pBucket[i];
while (NULL != b)
{
CBucket *n = b->m_pNext;
delete b;
b = n;
}
}
delete[] m_pBucket;
}
void CHash::init(int size)
{
m_pBucket = new CBucket *[size];
for (int i = 0; i < size; ++i)
m_pBucket[i] = NULL;
m_iHashSize = size;
}
CUDT *CHash::lookup(int32_t id)
{
CBucket *b = m_pBucket[id % m_iHashSize];
while (NULL != b)
{
if (id == b->m_iID)
return b->m_pUDT;
b = b->m_pNext;
}
return NULL;
}
void CHash::insert(int32_t id, CUDT *u)
{
CBucket *b = m_pBucket[id % m_iHashSize];
CBucket *n = new CBucket;
n->m_iID = id;
n->m_pUDT = u;
n->m_pNext = b;
m_pBucket[id % m_iHashSize] = n;
}
void CHash::remove(int32_t id)
{
CBucket *b = m_pBucket[id % m_iHashSize];
CBucket *p = NULL;
while (NULL != b)
{
if (id == b->m_iID)
{
if (NULL == p)
m_pBucket[id % m_iHashSize] = b->m_pNext;
else
p->m_pNext = b->m_pNext;
delete b;
return;
}
p = b;
b = b->m_pNext;
}
}
CRendezvousQueue::CRendezvousQueue()
: m_lRendezvousID()
, m_RIDVectorLock()
{
}
CRendezvousQueue::~CRendezvousQueue()
{
m_lRendezvousID.clear();
}
void CRendezvousQueue::insert(
const SRTSOCKET& id, CUDT* u, const sockaddr_any& addr, const steady_clock::time_point& ttl)
{
ScopedLock vg(m_RIDVectorLock);
CRL r;
r.m_iID = id;
r.m_pUDT = u;
r.m_PeerAddr = addr;
r.m_tsTTL = ttl;
m_lRendezvousID.push_back(r);
HLOGC(cnlog.Debug, log << "RID: adding socket @" << id << " for address: " << addr.str()
<< " expires: " << FormatTime(ttl)
<< " (total connectors: " << m_lRendezvousID.size() << ")");
}
void CRendezvousQueue::remove(const SRTSOCKET &id, bool should_lock)
{
if (should_lock)
enterCS(m_RIDVectorLock);
for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
{
if (i->m_iID == id)
{
m_lRendezvousID.erase(i);
break;
}
}
if (should_lock)
leaveCS(m_RIDVectorLock);
}
CUDT* CRendezvousQueue::retrieve(const sockaddr_any& addr, SRTSOCKET& w_id)
{
ScopedLock vg(m_RIDVectorLock);
for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i)
{
if (i->m_PeerAddr == addr && ((w_id == 0) || (w_id == i->m_iID)))
{
HLOGC(cnlog.Debug, log << "RID: found id @" << i->m_iID << " while looking for "
<< (w_id ? "THIS ID FROM " : "A NEW CONNECTION FROM ")
<< i->m_PeerAddr.str());
w_id = i->m_iID;
return i->m_pUDT;
}
}
#if ENABLE_HEAVY_LOGGING
std::ostringstream spec;
if (w_id == 0)
spec << "A NEW CONNECTION REQUEST";
else
spec << " AGENT @" << w_id;
HLOGC(cnlog.Debug, log << "RID: NO CONNECTOR FOR ADR:" << addr.str()
<< " while looking for " << spec.str() << " (" << m_lRendezvousID.size() << " connectors total)");
#endif
return NULL;
}
void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, const CPacket &response)
{
ScopedLock vg(m_RIDVectorLock);
if (m_lRendezvousID.empty())
return;
HLOGC(cnlog.Debug,
log << "updateConnStatus: updating after getting pkt id=" << response.m_iID
<< " status: " << ConnectStatusStr(cst));
#if ENABLE_HEAVY_LOGGING
int debug_nupd = 0;
int debug_nrun = 0;
int debug_nfail = 0;
#endif
for (list<CRL>::iterator i = m_lRendezvousID.begin(), i_next = i; i != m_lRendezvousID.end(); i = i_next)
{
++i_next;
if (rst == RST_AGAIN || i->m_iID != response.m_iID)
{
const steady_clock::time_point then = i->m_pUDT->m_tsLastReqTime;
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration timeout_250ms = milliseconds_from(250);
const bool now_is_time = (now - then) > timeout_250ms;
HLOGC(cnlog.Debug,
log << "RID:@" << i->m_iID << " then=" << FormatTime(then)
<< " now=" << FormatTime(now) << " passed=" << count_microseconds(now - then)
<< "<=> 250000 -- now's " << (now_is_time ? "" : "NOT ") << "the time");
if (!now_is_time)
continue;
}
HLOGC(cnlog.Debug, log << "RID:@" << i->m_iID << " cst=" << ConnectStatusStr(cst) << " -- sending update NOW.");
#if ENABLE_HEAVY_LOGGING
++debug_nrun;
#endif
const steady_clock::time_point now = steady_clock::now();
if (now >= i->m_tsTTL)
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID
<< " removed - EXPIRED ("
<< (is_zero(i->m_tsTTL) ? "enforced on FAILURE" : "passed TTL")
<< "). removing from queue");
i->m_pUDT->m_bConnecting = false;
if (!is_zero(i->m_tsTTL))
{
i->m_pUDT->m_RejectReason = SRT_REJ_TIMEOUT;
}
else if (i->m_pUDT->m_RejectReason == SRT_REJ_UNKNOWN)
{
i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
}
CUDT::s_UDTUnited.m_EPoll.update_events(i->m_iID, i->m_pUDT->m_sPollID, SRT_EPOLL_ERR, true);
int token = -1;
#if ENABLE_EXPERIMENTAL_BONDING
if (i->m_pUDT->m_parent->m_IncludedGroup)
{
token = i->m_pUDT->m_parent->m_IncludedGroup->updateFailedLink(i->m_iID);
}
#endif
CGlobEvent::triggerEvent();
if (i->m_pUDT->m_cbConnectHook)
{
CALLBACK_CALL(i->m_pUDT->m_cbConnectHook, i->m_iID,
i->m_pUDT->m_RejectReason == SRT_REJ_TIMEOUT ? SRT_ENOSERVER : SRT_ECONNREJ,
i->m_PeerAddr.get(), token);
}
i_next = m_lRendezvousID.erase(i);
continue;
}
else
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " still active (remaining "
<< std::fixed << (count_microseconds(i->m_tsTTL - now)/1000000.0) << "s of TTL)...");
}
if (!i->m_pUDT->m_bSynRecving)
{
#if ENABLE_HEAVY_LOGGING
++debug_nupd;
#endif
EReadStatus read_st = rst;
EConnectStatus conn_st = cst;
if (i->m_iID != response.m_iID)
{
read_st = RST_AGAIN;
conn_st = CONN_AGAIN;
}
if (!i->m_pUDT->processAsyncConnectRequest(read_st, conn_st, response, i->m_PeerAddr))
{
LOGC(cnlog.Error, log << "RendezvousQueue: processAsyncConnectRequest FAILED. Setting TTL as EXPIRED.");
i->m_pUDT->sendCtrl(UMSG_SHUTDOWN);
i->m_tsTTL = steady_clock::time_point(); #if ENABLE_HEAVY_LOGGING
++debug_nfail;
#endif
}
continue;
}
else
{
HLOGC(cnlog.Debug, log << "RID: socket @" << i->m_iID << " deemed SYNCHRONOUS, NOT UPDATING");
}
}
HLOGC(cnlog.Debug,
log << "updateConnStatus: " << debug_nupd << "/" << debug_nrun << " sockets updated ("
<< (debug_nrun - debug_nupd) << " useless). REMOVED " << debug_nfail << " sockets.");
}
CRcvQueue::CRcvQueue()
: m_WorkerThread()
, m_UnitQueue()
, m_pRcvUList(NULL)
, m_pHash(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_iPayloadSize()
, m_bClosing(false)
, m_LSLock()
, m_pListener(NULL)
, m_pRendezvousQueue(NULL)
, m_vNewEntry()
, m_IDLock()
, m_mBuffer()
, m_BufferCond()
{
setupCond(m_BufferCond, "QueueBuffer");
}
CRcvQueue::~CRcvQueue()
{
m_bClosing = true;
if (m_WorkerThread.joinable())
{
HLOGC(rslog.Debug, log << "RcvQueue: EXIT");
m_WorkerThread.join();
}
releaseCond(m_BufferCond);
delete m_pRcvUList;
delete m_pHash;
delete m_pRendezvousQueue;
for (map<int32_t, std::queue<CPacket *> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++i)
{
while (!i->second.empty())
{
CPacket *pkt = i->second.front();
delete[] pkt->m_pcData;
delete pkt;
i->second.pop();
}
}
}
#if ENABLE_LOGGING
int CRcvQueue::m_counter = 0;
#endif
void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel *cc, CTimer *t)
{
m_iPayloadSize = payload;
m_UnitQueue.init(qsize, payload, version);
m_pHash = new CHash;
m_pHash->init(hsize);
m_pChannel = cc;
m_pTimer = t;
m_pRcvUList = new CRcvUList;
m_pRendezvousQueue = new CRendezvousQueue;
#if ENABLE_LOGGING
++m_counter;
const std::string thrname = "SRT:RcvQ:w" + Sprint(m_counter);
#else
const std::string thrname = "SRT:RcvQ:w";
#endif
if (!StartThread(m_WorkerThread, CRcvQueue::worker, this, thrname.c_str()))
{
throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
}
}
void *CRcvQueue::worker(void *param)
{
CRcvQueue * self = (CRcvQueue *)param;
sockaddr_any sa(self->m_UnitQueue.getIPversion());
int32_t id = 0;
THREAD_STATE_INIT("SRT:RcvQ:worker");
CUnit * unit = 0;
EConnectStatus cst = CONN_AGAIN;
while (!self->m_bClosing)
{
bool have_received = false;
EReadStatus rst = self->worker_RetrieveUnit((id), (unit), (sa));
if (rst == RST_OK)
{
if (id < 0)
{
HLOGC(qrlog.Debug,
log << self->CONID() << "RECEIVED negative socket id '" << id
<< "', rejecting (POSSIBLE ATTACK)");
continue;
}
if (id == 0)
{
cst = self->worker_ProcessConnectionRequest(unit, sa);
}
else
{
cst = self->worker_ProcessAddressedPacket(id, unit, sa);
}
HLOGC(qrlog.Debug, log << self->CONID() << "worker: result for the unit: " << ConnectStatusStr(cst));
if (cst == CONN_AGAIN)
{
HLOGC(qrlog.Debug, log << self->CONID() << "worker: packet not dispatched, continuing reading.");
continue;
}
have_received = true;
}
else if (rst == RST_ERROR)
{
if (self->m_bClosing)
{
HLOGC(qrlog.Debug,
log << self->CONID() << "CChannel reported error, but Queue is closing - INTERRUPTING worker.");
}
else
{
LOGC(qrlog.Fatal,
log << self->CONID()
<< "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway.");
}
cst = CONN_REJECT;
break;
}
const steady_clock::time_point curtime_minus_syn = steady_clock::now() - microseconds_from(CUDT::COMM_SYN_INTERVAL_US);
CRNode *ul = self->m_pRcvUList->m_pUList;
while ((NULL != ul) && (ul->m_tsTimeStamp < curtime_minus_syn))
{
CUDT *u = ul->m_pUDT;
if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
{
u->checkTimers();
self->m_pRcvUList->update(u);
}
else
{
HLOGC(qrlog.Debug,
log << CUDTUnited::CONID(u->m_SocketID) << " SOCKET broken, REMOVING FROM RCV QUEUE/MAP.");
self->m_pHash->remove(u->m_SocketID);
self->m_pRcvUList->remove(u);
u->m_pRNode->m_bOnList = false;
}
ul = self->m_pRcvUList->m_pUList;
}
if (have_received)
{
HLOGC(qrlog.Debug,
log << "worker: RECEIVED PACKET --> updateConnStatus. cst=" << ConnectStatusStr(cst) << " id=" << id
<< " pkt-payload-size=" << unit->m_Packet.getLength());
}
self->m_pRendezvousQueue->updateConnStatus(rst, cst, unit->m_Packet);
}
THREAD_EXIT();
return NULL;
}
EReadStatus CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_unit, sockaddr_any& w_addr)
{
#if !USE_BUSY_WAITING
m_pTimer->tick();
#endif
while (ifNewEntry())
{
CUDT *ne = getNewEntry();
if (ne)
{
HLOGC(qrlog.Debug,
log << CUDTUnited::CONID(ne->m_SocketID)
<< " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
m_pRcvUList->insert(ne);
m_pHash->insert(ne->m_SocketID, ne);
}
}
w_unit = m_UnitQueue.getNextAvailUnit();
if (!w_unit)
{
CPacket temp;
temp.m_pcData = new char[m_iPayloadSize];
temp.setLength(m_iPayloadSize);
THREAD_PAUSED();
EReadStatus rst = m_pChannel->recvfrom((w_addr), (temp));
THREAD_RESUMED();
LOGC(qrlog.Error, log << CONID() << "LOCAL STORAGE DEPLETED. Dropping 1 packet: " << temp.Info());
delete[] temp.m_pcData;
return rst == RST_ERROR ? RST_ERROR : RST_AGAIN;
}
w_unit->m_Packet.setLength(m_iPayloadSize);
THREAD_PAUSED();
EReadStatus rst = m_pChannel->recvfrom((w_addr), (w_unit->m_Packet));
THREAD_RESUMED();
if (rst == RST_OK)
{
w_id = w_unit->m_Packet.m_iID;
HLOGC(qrlog.Debug, log << "INCOMING PACKET: FROM=" << w_addr.str()
<< " BOUND=" << m_pChannel->bindAddressAny().str()
<< " " << w_unit->m_Packet.Info());
}
return rst;
}
EConnectStatus CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& addr)
{
HLOGC(cnlog.Debug,
log << "Got sockID=0 from " << addr.str()
<< " - trying to resolve it as a connection request...");
int listener_ret = SRT_REJ_UNKNOWN;
bool have_listener = false;
{
ScopedLock cg(m_LSLock);
if (m_pListener)
{
LOGC(cnlog.Note,
log << "PASSING request from: " << addr.str() << " to agent:" << m_pListener->socketID());
listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet);
have_listener = true;
}
}
if (have_listener) {
LOGC(cnlog.Note,
log << CONID() << "Listener managed the connection request from: " << addr.str()
<< " result:" << RequestTypeStr(UDTRequestType(listener_ret)));
return listener_ret == SRT_REJ_UNKNOWN ? CONN_CONTINUE : CONN_REJECT;
}
return worker_TryAsyncRend_OrStore(0, unit, addr); }
EConnectStatus CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& addr)
{
CUDT *u = m_pHash->lookup(id);
if (!u)
{
HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id);
return worker_TryAsyncRend_OrStore(id, unit, addr);
}
if (addr != u->m_PeerAddr)
{
HLOGC(cnlog.Debug,
log << CONID() << "Packet for SID=" << id << " asoc with " << u->m_PeerAddr.str()
<< " received from " << addr.str() << " (CONSIDERED ATTACK ATTEMPT)");
return CONN_AGAIN;
}
if (!u->m_bConnected || u->m_bBroken || u->m_bClosing)
{
u->m_RejectReason = SRT_REJ_CLOSE;
return CONN_REJECT;
}
if (unit->m_Packet.isControl())
u->processCtrl(unit->m_Packet);
else
u->processData(unit);
u->checkTimers();
m_pRcvUList->update(u);
return CONN_RUNNING;
}
EConnectStatus CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& addr)
{
CUDT *u = m_pRendezvousQueue->retrieve(addr, (id));
if (!u)
{
if (id == 0)
{
HLOGC(cnlog.Debug,
log << CONID() << "AsyncOrRND: no sockets expect connection from " << addr.str()
<< " - POSSIBLE ATTACK, ignore packet");
}
else
{
HLOGC(cnlog.Debug,
log << CONID() << "AsyncOrRND: no sockets expect socket " << id << " from " << addr.str()
<< " - POSSIBLE ATTACK, ignore packet");
}
return CONN_AGAIN; }
if (!u->m_bSynRecving)
{
HLOGC(cnlog.Debug, log << "AsyncOrRND: packet RESOLVED TO @" << id << " -- continuing as ASYNC CONNECT");
EConnectStatus cst = u->processAsyncConnectResponse(unit->m_Packet);
if (cst == CONN_CONFUSED)
{
LOGC(cnlog.Warn, log << "AsyncOrRND: PACKET NOT HANDSHAKE - re-requesting handshake from peer");
storePkt(id, unit->m_Packet.clone());
if (!u->processAsyncConnectRequest(RST_AGAIN, CONN_CONTINUE, unit->m_Packet, u->m_PeerAddr))
{
cst = CONN_REJECT;
}
else
{
cst = CONN_CONTINUE;
}
}
if (cst == CONN_ACCEPT && !unit->m_Packet.isControl())
{
CUDT *ne = getNewEntry(); if (ne)
{
HLOGC(cnlog.Debug,
log << CUDTUnited::CONID(ne->m_SocketID)
<< " SOCKET pending for connection - ADDING TO RCV QUEUE/MAP");
m_pRcvUList->insert(ne);
m_pHash->insert(ne->m_SocketID, ne);
HLOGC(cnlog.Debug,
log << "AsyncOrRND: packet SWITCHED TO CONNECTED with ID=" << id
<< " -- passing to worker_ProcessAddressedPacket");
cst = worker_ProcessAddressedPacket(id, unit, addr);
if (cst == CONN_REJECT)
return cst;
return CONN_ACCEPT; }
else
{
LOGC(cnlog.Error,
log << "IPE: AsyncOrRND: packet SWITCHED TO CONNECTED, but ID=" << id
<< " is still not present in the socket ID dispatch hash - DISREGARDING");
}
}
return cst;
}
HLOGC(cnlog.Debug,
log << "AsyncOrRND: packet RESOLVED TO ID=" << id << " -- continuing through CENTRAL PACKET QUEUE");
storePkt(id, unit->m_Packet.clone());
return CONN_CONTINUE;
}
void CRcvQueue::stopWorker()
{
m_bClosing = true;
if (srt::sync::this_thread::get_id() == m_WorkerThread.get_id())
{
LOGC(rslog.Error, log << "IPE: RcvQ:WORKER TRIES TO CLOSE ITSELF!");
return; }
HLOGC(rslog.Debug, log << "RcvQueue: EXIT (forced)");
m_WorkerThread.join();
}
int CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
{
UniqueLock bufferlock (m_BufferLock);
CSync buffercond (m_BufferCond, bufferlock);
map<int32_t, std::queue<CPacket *> >::iterator i = m_mBuffer.find(id);
if (i == m_mBuffer.end())
{
THREAD_PAUSED();
buffercond.wait_for(seconds_from(1));
THREAD_RESUMED();
i = m_mBuffer.find(id);
if (i == m_mBuffer.end())
{
w_packet.setLength(-1);
return -1;
}
}
CPacket *newpkt = i->second.front();
if (w_packet.getLength() < newpkt->getLength())
{
w_packet.setLength(-1);
return -1;
}
memcpy((w_packet.m_nHeader), newpkt->m_nHeader, CPacket::HDR_SIZE);
memcpy((w_packet.m_pcData), newpkt->m_pcData, newpkt->getLength());
w_packet.setLength(newpkt->getLength());
delete[] newpkt->m_pcData;
delete newpkt;
i->second.pop();
if (i->second.empty())
m_mBuffer.erase(i);
return (int)w_packet.getLength();
}
int CRcvQueue::setListener(CUDT *u)
{
ScopedLock lslock(m_LSLock);
if (NULL != m_pListener)
return -1;
m_pListener = u;
return 0;
}
void CRcvQueue::removeListener(const CUDT *u)
{
ScopedLock lslock(m_LSLock);
if (u == m_pListener)
m_pListener = NULL;
}
void CRcvQueue::registerConnector(const SRTSOCKET& id, CUDT* u, const sockaddr_any& addr, const steady_clock::time_point& ttl)
{
HLOGC(cnlog.Debug,
log << "registerConnector: adding @" << id << " addr=" << addr.str() << " TTL=" << FormatTime(ttl));
m_pRendezvousQueue->insert(id, u, addr, ttl);
}
void CRcvQueue::removeConnector(const SRTSOCKET &id, bool should_lock)
{
HLOGC(cnlog.Debug, log << "removeConnector: removing @" << id);
m_pRendezvousQueue->remove(id, should_lock);
ScopedLock bufferlock(m_BufferLock);
map<int32_t, std::queue<CPacket *> >::iterator i = m_mBuffer.find(id);
if (i != m_mBuffer.end())
{
HLOGC(cnlog.Debug,
log << "removeConnector: ... and its packet queue with " << i->second.size() << " packets collected");
while (!i->second.empty())
{
delete[] i->second.front()->m_pcData;
delete i->second.front();
i->second.pop();
}
m_mBuffer.erase(i);
}
}
void CRcvQueue::setNewEntry(CUDT *u)
{
HLOGC(cnlog.Debug, log << CUDTUnited::CONID(u->m_SocketID) << "setting socket PENDING FOR CONNECTION");
ScopedLock listguard(m_IDLock);
m_vNewEntry.push_back(u);
}
bool CRcvQueue::ifNewEntry() { return !(m_vNewEntry.empty()); }
CUDT *CRcvQueue::getNewEntry()
{
ScopedLock listguard(m_IDLock);
if (m_vNewEntry.empty())
return NULL;
CUDT *u = (CUDT *)*(m_vNewEntry.begin());
m_vNewEntry.erase(m_vNewEntry.begin());
return u;
}
void CRcvQueue::storePkt(int32_t id, CPacket *pkt)
{
UniqueLock bufferlock (m_BufferLock);
CSync passcond (m_BufferCond, bufferlock);
map<int32_t, std::queue<CPacket *> >::iterator i = m_mBuffer.find(id);
if (i == m_mBuffer.end())
{
m_mBuffer[id].push(pkt);
passcond.signal_locked(bufferlock);
}
else
{
if (i->second.size() > 16)
return;
i->second.push(pkt);
}
}
void CMultiplexer::destroy()
{
delete m_pRcvQueue;
delete m_pSndQueue;
delete m_pTimer;
if (m_pChannel)
{
m_pChannel->close();
delete m_pChannel;
}
}