#ifndef INC_SRT_QUEUE_H
#define INC_SRT_QUEUE_H
#include "channel.h"
#include "common.h"
#include "packet.h"
#include "netinet_any.h"
#include "utilities.h"
#include <list>
#include <map>
#include <queue>
#include <vector>
class CUDT;
struct CUnit
{
CPacket m_Packet; enum Flag { FREE = 0, GOOD = 1, PASSACK = 2, DROPPED = 3 };
Flag m_iFlag; };
class CUnitQueue
{
public:
CUnitQueue();
~CUnitQueue();
public:
int init(int size, int mss, int version);
int increase();
int shrink();
public:
int size() const { return m_iSize - m_iCount; }
int capacity() const { return m_iSize; }
public:
CUnit* getNextAvailUnit();
void makeUnitFree(CUnit * unit);
void makeUnitGood(CUnit * unit);
public:
inline int getIPversion() const { return m_iIPversion; }
private:
struct CQEntry
{
CUnit* m_pUnit; char* m_pBuffer; int m_iSize;
CQEntry* m_pNext;
}
*m_pQEntry, *m_pCurrQueue, *m_pLastQueue;
CUnit* m_pAvailUnit;
int m_iSize; int m_iCount;
int m_iMSS; int m_iIPversion;
private:
CUnitQueue(const CUnitQueue&);
CUnitQueue& operator=(const CUnitQueue&);
};
struct CSNode
{
CUDT* m_pUDT; srt::sync::steady_clock::time_point m_tsTimeStamp;
int m_iHeapLoc; };
class CSndUList
{
friend class CSndQueue;
public:
CSndUList();
~CSndUList();
public:
enum EReschedule { DONT_RESCHEDULE = 0, DO_RESCHEDULE = 1 };
static EReschedule rescheduleIf(bool cond) { return cond ? DO_RESCHEDULE : DONT_RESCHEDULE; }
void update(const CUDT* u, EReschedule reschedule);
int pop(sockaddr_any& addr, CPacket& pkt);
void remove(const CUDT* u);
srt::sync::steady_clock::time_point getNextProcTime();
private:
void realloc_();
void insert_(const srt::sync::steady_clock::time_point &ts, const CUDT* u);
void insert_norealloc_(const srt::sync::steady_clock::time_point &ts, const CUDT* u);
void remove_(const CUDT* u);
private:
CSNode** m_pHeap; int m_iArrayLength; int m_iLastEntry;
srt::sync::Mutex m_ListLock;
srt::sync::Mutex* m_pWindowLock;
srt::sync::Condition* m_pWindowCond;
srt::sync::CTimer* m_pTimer;
private:
CSndUList(const CSndUList&);
CSndUList& operator=(const CSndUList&);
};
struct CRNode
{
CUDT* m_pUDT; srt::sync::steady_clock::time_point m_tsTimeStamp;
CRNode* m_pPrev; CRNode* m_pNext;
bool m_bOnList; };
class CRcvUList
{
public:
CRcvUList();
~CRcvUList();
public:
void insert(const CUDT* u);
void remove(const CUDT* u);
void update(const CUDT* u);
public:
CRNode* m_pUList;
private:
CRNode* m_pLast;
private:
CRcvUList(const CRcvUList&);
CRcvUList& operator=(const CRcvUList&);
};
class CHash
{
public:
CHash();
~CHash();
public:
void init(int size);
CUDT* lookup(int32_t id);
void insert(int32_t id, CUDT* u);
void remove(int32_t id);
private:
struct CBucket
{
int32_t m_iID; CUDT* m_pUDT;
CBucket* m_pNext; } **m_pBucket;
int m_iHashSize;
private:
CHash(const CHash&);
CHash& operator=(const CHash&);
};
class CRendezvousQueue
{
public:
CRendezvousQueue();
~CRendezvousQueue();
public:
void insert(const SRTSOCKET& id, CUDT* u, const sockaddr_any& addr,
const srt::sync::steady_clock::time_point &ttl);
void remove(const SRTSOCKET& id, bool should_lock);
CUDT* retrieve(const sockaddr_any& addr, SRTSOCKET& id);
void updateConnStatus(EReadStatus rst, EConnectStatus, const CPacket& response);
private:
struct CRL
{
SRTSOCKET m_iID; CUDT* m_pUDT; sockaddr_any m_PeerAddr; srt::sync::steady_clock::time_point m_tsTTL; };
std::list<CRL> m_lRendezvousID;
srt::sync::Mutex m_RIDVectorLock;
};
class CSndQueue
{
friend class CUDT;
friend class CUDTUnited;
public:
CSndQueue();
~CSndQueue();
public:
std::string CONID() const { return ""; }
void init(CChannel* c, srt::sync::CTimer* t);
int sendto(const sockaddr_any& addr, CPacket& packet);
int getIpTTL() const;
int getIpToS() const;
#ifdef SRT_ENABLE_BINDTODEVICE
bool getBind(char* dst, size_t len) const;
#endif
int ioctlQuery(int type) const { return m_pChannel->ioctlQuery(type); }
int sockoptQuery(int level, int type) const { return m_pChannel->sockoptQuery(level, type); }
void setClosing()
{
m_bClosing = true;
}
private:
static void* worker(void* param);
srt::sync::CThread m_WorkerThread;
private:
CSndUList* m_pSndUList; CChannel* m_pChannel; srt::sync::CTimer* m_pTimer;
srt::sync::Mutex m_WindowLock;
srt::sync::Condition m_WindowCond;
volatile bool m_bClosing;
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
uint64_t m_ullDbgPeriod;
uint64_t m_ullDbgTime;
struct {
unsigned long lIteration; unsigned long lSleepTo; unsigned long lNotReadyPop; unsigned long lSendTo;
unsigned long lNotReadyTs;
unsigned long lCondWait; } m_WorkerStats;
#endif
#if ENABLE_LOGGING
static int m_counter;
#endif
private:
CSndQueue(const CSndQueue&);
CSndQueue& operator=(const CSndQueue&);
};
class CRcvQueue
{
friend class CUDT;
friend class CUDTUnited;
public:
CRcvQueue();
~CRcvQueue();
public:
std::string CONID() const { return ""; }
void init(int size, int payload, int version, int hsize, CChannel* c, srt::sync::CTimer* t);
int recvfrom(int32_t id, CPacket& to_packet);
void stopWorker();
void setClosing()
{
m_bClosing = true;
}
private:
static void* worker(void* param);
srt::sync::CThread m_WorkerThread;
EReadStatus worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr_any& sa);
EConnectStatus worker_ProcessConnectionRequest(CUnit* unit, const sockaddr_any& sa);
EConnectStatus worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr_any& sa);
EConnectStatus worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr_any& sa);
private:
CUnitQueue m_UnitQueue; CRcvUList* m_pRcvUList; CHash* m_pHash; CChannel* m_pChannel; srt::sync::CTimer* m_pTimer;
int m_iPayloadSize;
volatile bool m_bClosing; #if ENABLE_LOGGING
static int m_counter;
#endif
private:
int setListener(CUDT* u);
void removeListener(const CUDT* u);
void registerConnector(const SRTSOCKET& id, CUDT* u, const sockaddr_any& addr, const srt::sync::steady_clock::time_point& ttl);
void removeConnector(const SRTSOCKET& id, bool should_lock = true);
void setNewEntry(CUDT* u);
bool ifNewEntry();
CUDT* getNewEntry();
void storePkt(int32_t id, CPacket* pkt);
private:
srt::sync::Mutex m_LSLock;
CUDT* m_pListener; CRendezvousQueue* m_pRendezvousQueue;
std::vector<CUDT*> m_vNewEntry; srt::sync::Mutex m_IDLock;
std::map<int32_t, std::queue<CPacket*> > m_mBuffer; srt::sync::Mutex m_BufferLock;
srt::sync::Condition m_BufferCond;
private:
CRcvQueue(const CRcvQueue&);
CRcvQueue& operator=(const CRcvQueue&);
};
struct CMultiplexer
{
CSndQueue* m_pSndQueue; CRcvQueue* m_pRcvQueue; CChannel* m_pChannel; srt::sync::CTimer* m_pTimer;
int m_iPort; int m_iIPversion; int m_iIpTTL;
int m_iIpToS;
#ifdef SRT_ENABLE_BINDTODEVICE
std::string m_BindToDevice;
#endif
int m_iMSS; int m_iRefCount; int m_iIpV6Only; bool m_bReusable;
int m_iID;
CMultiplexer()
: m_pSndQueue(NULL)
, m_pRcvQueue(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
{
}
void destroy();
};
#endif