#include "platform_sys.h"
#include <cstring>
#include <cmath>
#include "buffer.h"
#include "packet.h"
#include "core.h"
#include "logging.h"
using namespace std;
using namespace srt_logging;
using namespace srt::sync;
#if !defined(SRT_MAVG_SAMPLING_RATE)
#define SRT_MAVG_SAMPLING_RATE 40
#endif
bool AvgBufSize::isTimeToUpdate(const time_point& now) const
{
const int usMAvgBasePeriod = 1000000; const int us2ms = 1000;
const int msMAvgPeriod = (usMAvgBasePeriod / SRT_MAVG_SAMPLING_RATE) / us2ms;
const uint64_t elapsed_ms = count_milliseconds(now - m_tsLastSamplingTime); return (elapsed_ms >= msMAvgPeriod);
}
void AvgBufSize::update(const steady_clock::time_point& now, int pkts, int bytes, int timespan_ms)
{
const uint64_t elapsed_ms = count_milliseconds(now - m_tsLastSamplingTime); m_tsLastSamplingTime = now;
const uint64_t one_second_in_ms = 1000;
if (elapsed_ms > one_second_in_ms)
{
m_dCountMAvg = pkts;
m_dBytesCountMAvg = bytes;
m_dTimespanMAvg = timespan_ms;
return;
}
m_dCountMAvg = avg_iir_w<1000, double>(m_dCountMAvg, pkts, elapsed_ms);
m_dBytesCountMAvg = avg_iir_w<1000, double>(m_dBytesCountMAvg, bytes, elapsed_ms);
m_dTimespanMAvg = avg_iir_w<1000, double>(m_dTimespanMAvg, timespan_ms, elapsed_ms);
}
int round_val(double val)
{
return static_cast<int>(round(val));
}
CSndBuffer::CSndBuffer(int size, int mss)
: m_BufLock()
, m_pBlock(NULL)
, m_pFirstBlock(NULL)
, m_pCurrBlock(NULL)
, m_pLastBlock(NULL)
, m_pBuffer(NULL)
, m_iNextMsgNo(1)
, m_iSize(size)
, m_iMSS(mss)
, m_iCount(0)
, m_iBytesCount(0)
, m_iInRatePktsCount(0)
, m_iInRateBytesCount(0)
, m_InRatePeriod(INPUTRATE_FAST_START_US) , m_iInRateBps(INPUTRATE_INITIAL_BYTESPS)
{
m_pBuffer = new Buffer;
m_pBuffer->m_pcData = new char[m_iSize * m_iMSS];
m_pBuffer->m_iSize = m_iSize;
m_pBuffer->m_pNext = NULL;
m_pBlock = new Block;
Block* pb = m_pBlock;
for (int i = 1; i < m_iSize; ++i)
{
pb->m_pNext = new Block;
pb->m_iMsgNoBitset = 0;
pb = pb->m_pNext;
}
pb->m_pNext = m_pBlock;
pb = m_pBlock;
char* pc = m_pBuffer->m_pcData;
for (int i = 0; i < m_iSize; ++i)
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
}
m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
setupMutex(m_BufLock, "Buf");
}
CSndBuffer::~CSndBuffer()
{
Block* pb = m_pBlock->m_pNext;
while (pb != m_pBlock)
{
Block* temp = pb;
pb = pb->m_pNext;
delete temp;
}
delete m_pBlock;
while (m_pBuffer != NULL)
{
Buffer* temp = m_pBuffer;
m_pBuffer = m_pBuffer->m_pNext;
delete[] temp->m_pcData;
delete temp;
}
releaseMutex(m_BufLock);
}
void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
{
int32_t& w_msgno = w_mctrl.msgno;
int32_t& w_seqno = w_mctrl.pktseq;
int64_t& w_srctime = w_mctrl.srctime;
int& w_ttl = w_mctrl.msgttl;
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size++;
HLOGC(bslog.Debug,
log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for "
<< len << " bytes");
while (size + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
increase();
}
const steady_clock::time_point time = steady_clock::now();
const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0;
HLOGC(bslog.Debug,
log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno="
<< (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order");
Block* s = m_pLastBlock;
if (w_msgno == SRT_MSGNO_NONE) {
HLOGC(bslog.Debug, log << "addBuffer: using internally managed msgno=" << m_iNextMsgNo);
w_msgno = m_iNextMsgNo;
}
else
{
HLOGC(bslog.Debug, log << "addBuffer: OVERWRITTEN by msgno supplied by caller: msgno=" << w_msgno);
m_iNextMsgNo = w_msgno;
}
for (int i = 0; i < size; ++i)
{
int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
HLOGC(bslog.Debug,
log << "addBuffer: %" << w_seqno << " #" << w_msgno << " spreading from=" << (i * m_iMSS)
<< " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
memcpy((s->m_pcData), data + i * m_iMSS, pktlen);
s->m_iLength = pktlen;
s->m_iSeqNo = w_seqno;
w_seqno = CSeqNo::incseq(w_seqno);
s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
s->m_llSourceTime_us = w_srctime;
s->m_tsOriginTime = time;
s->m_tsRexmitTime = time_point();
s->m_iTTL = w_ttl;
if (!w_srctime)
w_srctime = count_microseconds(s->m_tsOriginTime.time_since_epoch());
SRT_ASSERT(s->m_pNext);
s = s->m_pNext;
}
m_pLastBlock = s;
enterCS(m_BufLock);
m_iCount += size;
m_iBytesCount += len;
m_tsLastOriginTime = time;
updateInputRate(time, size, len);
updAvgBufSize(time);
leaveCS(m_BufLock);
const int nextmsgno = ++MsgNo(m_iNextMsgNo);
HLOGC(bslog.Debug, log << "CSndBuffer::addBuffer: updating msgno: #" << m_iNextMsgNo << " -> #" << nextmsgno);
m_iNextMsgNo = nextmsgno;
}
void CSndBuffer::setInputRateSmpPeriod(int period)
{
m_InRatePeriod = (uint64_t)period; }
void CSndBuffer::updateInputRate(const steady_clock::time_point& time, int pkts, int bytes)
{
if (m_InRatePeriod == 0)
return;
if (is_zero(m_tsInRateStartTime))
{
m_tsInRateStartTime = time;
return;
}
m_iInRatePktsCount += pkts;
m_iInRateBytesCount += bytes;
const bool early_update = (m_InRatePeriod < INPUTRATE_RUNNING_US) && (m_iInRatePktsCount > INPUTRATE_MAX_PACKETS);
const uint64_t period_us = count_microseconds(time - m_tsInRateStartTime);
if (early_update || period_us > m_InRatePeriod)
{
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
HLOGC(bslog.Debug,
log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
<< " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
m_iInRatePktsCount = 0;
m_iInRateBytesCount = 0;
m_tsInRateStartTime = time;
setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
}
}
int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
{
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size++;
HLOGC(bslog.Debug,
log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size
<< " buffers for " << len << " bytes");
while (size + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug,
log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
increase();
}
HLOGC(bslog.Debug,
log << CONID() << "addBufferFromFile: adding " << size << " packets (" << len
<< " bytes) to send, msgno=" << m_iNextMsgNo);
Block* s = m_pLastBlock;
int total = 0;
for (int i = 0; i < size; ++i)
{
if (ifs.bad() || ifs.fail() || ifs.eof())
break;
int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
HLOGC(bslog.Debug,
log << "addBufferFromFile: reading from=" << (i * m_iMSS) << " size=" << pktlen
<< " TO BUFFER:" << (void*)s->m_pcData);
ifs.read(s->m_pcData, pktlen);
if ((pktlen = int(ifs.gcount())) <= 0)
break;
s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
s->m_iLength = pktlen;
s->m_iTTL = SRT_MSGTTL_INF;
s = s->m_pNext;
total += pktlen;
}
m_pLastBlock = s;
enterCS(m_BufLock);
m_iCount += size;
m_iBytesCount += total;
leaveCS(m_BufLock);
m_iNextMsgNo++;
if (m_iNextMsgNo == int32_t(MSGNO_SEQ::mask))
m_iNextMsgNo = 1;
return total;
}
steady_clock::time_point CSndBuffer::getSourceTime(const CSndBuffer::Block& block)
{
if (block.m_llSourceTime_us)
{
return steady_clock::time_point() + microseconds_from(block.m_llSourceTime_us);
}
return block.m_tsOriginTime;
}
int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs)
{
if (m_pCurrBlock == m_pLastBlock)
return 0;
w_packet.m_pcData = m_pCurrBlock->m_pcData;
int readlen = m_pCurrBlock->m_iLength;
w_packet.setLength(readlen);
w_packet.m_iSeqNo = m_pCurrBlock->m_iSeqNo;
if (kflgs == -1)
{
HLOGC(bslog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING.");
readlen = 0;
}
else
{
m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs);
}
w_packet.m_iMsgNo = m_pCurrBlock->m_iMsgNoBitset;
w_srctime = getSourceTime(*m_pCurrBlock);
m_pCurrBlock = m_pCurrBlock->m_pNext;
HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
return readlen;
}
int32_t CSndBuffer::getMsgNoAt(const int offset)
{
ScopedLock bufferguard(m_BufLock);
Block* p = m_pFirstBlock;
if (p)
{
HLOGC(bslog.Debug,
log << "CSndBuffer::getMsgNoAt: FIRST MSG: size=" << p->m_iLength << " %" << p->m_iSeqNo << " #"
<< p->getMsgSeq() << " !" << BufferStamp(p->m_pcData, p->m_iLength));
}
if (offset >= m_iCount)
{
LOGC(bslog.Error,
log << "CSndBuffer::getMsgNoAt: IPE: offset=" << offset << " not found, max offset=" << m_iCount);
return SRT_MSGNO_CONTROL;
}
int i;
Block* ee SRT_ATR_UNUSED = 0;
for (i = 0; i < offset && p; ++i)
{
ee = p;
p = p->m_pNext;
}
if (!p)
{
LOGC(bslog.Error,
log << "CSndBuffer::getMsgNoAt: IPE: offset=" << offset << " not found, stopped at " << i << " with #"
<< (ee ? ee->getMsgSeq() : SRT_MSGNO_NONE));
return SRT_MSGNO_CONTROL;
}
HLOGC(bslog.Debug,
log << "CSndBuffer::getMsgNoAt: offset=" << offset << " found, size=" << p->m_iLength << " %" << p->m_iSeqNo
<< " #" << p->getMsgSeq() << " !" << BufferStamp(p->m_pcData, p->m_iLength));
return p->getMsgSeq();
}
int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time_point& w_srctime, int& w_msglen)
{
int32_t& msgno_bitset = w_packet.m_iMsgNo;
ScopedLock bufferguard(m_BufLock);
Block* p = m_pFirstBlock;
for (int i = 0; i < offset; ++i)
p = p->m_pNext;
if ((p->m_iTTL >= 0) && (count_milliseconds(steady_clock::now() - p->m_tsOriginTime) > p->m_iTTL))
{
int32_t msgno = p->getMsgSeq();
w_msglen = 1;
p = p->m_pNext;
bool move = false;
while (msgno == p->getMsgSeq())
{
if (p == m_pCurrBlock)
move = true;
p = p->m_pNext;
if (move)
m_pCurrBlock = p;
w_msglen++;
}
HLOGC(qslog.Debug,
log << "CSndBuffer::readData: due to TTL exceeded, " << w_msglen << " messages to drop, up to " << msgno);
msgno_bitset = msgno;
return -1;
}
w_packet.m_pcData = p->m_pcData;
int readlen = p->m_iLength;
w_packet.setLength(readlen);
w_packet.m_iMsgNo = p->m_iMsgNoBitset;
w_srctime = getSourceTime(*p);
p->m_tsRexmitTime = steady_clock::now();
HLOGC(qslog.Debug,
log << CONID() << "CSndBuffer: getting packet %" << p->m_iSeqNo << " as per %" << w_packet.m_iSeqNo
<< " size=" << readlen << " to send [REXMIT]");
return readlen;
}
srt::sync::steady_clock::time_point CSndBuffer::getPacketRexmitTime(const int offset)
{
ScopedLock bufferguard(m_BufLock);
const Block* p = m_pFirstBlock;
for (int i = 0; i < offset; ++i)
{
SRT_ASSERT(p);
p = p->m_pNext;
}
SRT_ASSERT(p);
return p->m_tsRexmitTime;
}
void CSndBuffer::ackData(int offset)
{
ScopedLock bufferguard(m_BufLock);
bool move = false;
for (int i = 0; i < offset; ++i)
{
m_iBytesCount -= m_pFirstBlock->m_iLength;
if (m_pFirstBlock == m_pCurrBlock)
move = true;
m_pFirstBlock = m_pFirstBlock->m_pNext;
}
if (move)
m_pCurrBlock = m_pFirstBlock;
m_iCount -= offset;
updAvgBufSize(steady_clock::now());
}
int CSndBuffer::getCurrBufSize() const
{
return m_iCount;
}
int CSndBuffer::getAvgBufSize(int& w_bytes, int& w_tsp)
{
ScopedLock bufferguard(m_BufLock);
updAvgBufSize(steady_clock::now());
w_bytes = round_val(m_mavg.bytes());
w_tsp = round_val(m_mavg.timespan_ms());
return round_val(m_mavg.pkts());
}
void CSndBuffer::updAvgBufSize(const steady_clock::time_point& now)
{
if (!m_mavg.isTimeToUpdate(now))
return;
int bytes = 0;
int timespan_ms = 0;
const int pkts = getCurrBufSize((bytes), (timespan_ms));
m_mavg.update(now, pkts, bytes, timespan_ms);
}
int CSndBuffer::getCurrBufSize(int& w_bytes, int& w_timespan)
{
w_bytes = m_iBytesCount;
w_timespan = 0 < m_iCount ? count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0;
return m_iCount;
}
int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_clock::time_point& too_late_time)
{
int dpkts = 0;
int dbytes = 0;
bool move = false;
int32_t msgno = 0;
ScopedLock bufferguard(m_BufLock);
for (int i = 0; i < m_iCount && m_pFirstBlock->m_tsOriginTime < too_late_time; ++i)
{
dpkts++;
dbytes += m_pFirstBlock->m_iLength;
msgno = m_pFirstBlock->getMsgSeq();
if (m_pFirstBlock == m_pCurrBlock)
move = true;
m_pFirstBlock = m_pFirstBlock->m_pNext;
}
if (move)
{
m_pCurrBlock = m_pFirstBlock;
}
m_iCount -= dpkts;
m_iBytesCount -= dbytes;
w_bytes = dbytes;
w_first_msgno = ++MsgNo(msgno);
updAvgBufSize(steady_clock::now());
return (dpkts);
}
void CSndBuffer::increase()
{
int unitsize = m_pBuffer->m_iSize;
Buffer* nbuf = NULL;
try
{
nbuf = new Buffer;
nbuf->m_pcData = new char[unitsize * m_iMSS];
}
catch (...)
{
delete nbuf;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
nbuf->m_iSize = unitsize;
nbuf->m_pNext = NULL;
Buffer* p = m_pBuffer;
while (p->m_pNext != NULL)
p = p->m_pNext;
p->m_pNext = nbuf;
Block* nblk = NULL;
try
{
nblk = new Block;
}
catch (...)
{
delete nblk;
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}
Block* pb = nblk;
for (int i = 1; i < unitsize; ++i)
{
pb->m_pNext = new Block;
pb = pb->m_pNext;
}
pb->m_pNext = m_pLastBlock->m_pNext;
m_pLastBlock->m_pNext = nblk;
pb = nblk;
char* pc = nbuf->m_pcData;
for (int i = 0; i < unitsize; ++i)
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
}
m_iSize += unitsize;
HLOGC(bslog.Debug,
log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iMSS) << " bytes spread to " << unitsize
<< " blocks"
<< " (total size: " << m_iSize << " bytes)");
}
#ifdef SRT_DEBUG_TSBPD_DRIFT
#endif
CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize_pkts)
: m_pUnit(NULL)
, m_iSize(bufsize_pkts)
, m_pUnitQueue(queue)
, m_iStartPos(0)
, m_iLastAckPos(0)
, m_iMaxPos(0)
, m_iNotch(0)
, m_BytesCountLock()
, m_iBytesCount(0)
, m_iAckedPktsCount(0)
, m_iAckedBytesCount(0)
, m_uAvgPayloadSz(7 * 188)
, m_bTsbPdMode(false)
, m_tdTsbPdDelay(0)
, m_bTsbPdWrapCheck(false)
{
m_pUnit = new CUnit*[m_iSize];
for (int i = 0; i < m_iSize; ++i)
m_pUnit[i] = NULL;
#ifdef SRT_DEBUG_TSBPD_DRIFT
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
#endif
setupMutex(m_BytesCountLock, "BytesCount");
}
CRcvBuffer::~CRcvBuffer()
{
for (int i = 0; i < m_iSize; ++i)
{
if (m_pUnit[i] != NULL)
{
m_pUnitQueue->makeUnitFree(m_pUnit[i]);
}
}
delete[] m_pUnit;
releaseMutex(m_BytesCountLock);
}
void CRcvBuffer::countBytes(int pkts, int bytes, bool acked)
{
ScopedLock cg(m_BytesCountLock);
if (!acked) {
m_iBytesCount += bytes;
if (bytes > 0)
m_uAvgPayloadSz = ((m_uAvgPayloadSz * (100 - 1)) + bytes) / 100;
}
else {
m_iAckedPktsCount += pkts;
m_iAckedBytesCount += bytes;
if (bytes < 0)
m_iBytesCount += bytes;
}
}
int CRcvBuffer::addData(CUnit* unit, int offset)
{
SRT_ASSERT(unit != NULL);
if (offset >= getAvailBufSize())
return -1;
const int pos = (m_iLastAckPos + offset) % m_iSize;
if (offset >= m_iMaxPos)
m_iMaxPos = offset + 1;
if (m_pUnit[pos] != NULL)
{
HLOGC(qrlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " rejected, already exists");
return -1;
}
m_pUnit[pos] = unit;
countBytes(1, (int)unit->m_Packet.getLength());
m_pUnitQueue->makeUnitGood(unit);
HLOGC(qrlog.Debug,
log << "addData: unit %" << unit->m_Packet.m_iSeqNo << " accepted, off=" << offset << " POS=" << pos);
return 0;
}
int CRcvBuffer::readBuffer(char* data, int len)
{
int p = m_iStartPos;
int lastack = m_iLastAckPos;
int rs = len;
IF_HEAVY_LOGGING(char* begin = data);
const steady_clock::time_point now = (m_bTsbPdMode ? steady_clock::now() : steady_clock::time_point());
HLOGC(brlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
while ((p != lastack) && (rs > 0))
{
if (m_pUnit[p] == NULL)
{
LOGC(brlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
return -1;
}
const CPacket& pkt = m_pUnit[p]->m_Packet;
if (m_bTsbPdMode)
{
HLOGC(brlog.Debug,
log << CONID() << "readBuffer: chk if time2play:"
<< " NOW=" << FormatTime(now)
<< " PKT TS=" << FormatTime(getPktTsbPdTime(pkt.getMsgTimeStamp())));
if ((getPktTsbPdTime(pkt.getMsgTimeStamp()) > now))
break;
}
const int pktlen = pkt.getLength();
const int remain_pktlen = pktlen - m_iNotch;
const int unitsize = std::min(remain_pktlen, rs);
HLOGC(brlog.Debug,
log << CONID() << "readBuffer: copying buffer #" << p << " targetpos=" << int(data - begin)
<< " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize - rs));
memcpy((data), pkt.m_pcData + m_iNotch, unitsize);
data += unitsize;
if (rs >= remain_pktlen)
{
freeUnitAt(p);
p = shiftFwd(p);
m_iNotch = 0;
}
else
m_iNotch += rs;
rs -= unitsize;
}
countBytes(-1, -(len - rs), true);
m_iStartPos = p;
return len - rs;
}
int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
{
int p = m_iStartPos;
int lastack = m_iLastAckPos;
int rs = len;
int32_t trace_seq ATR_UNUSED = SRT_SEQNO_NONE;
int trace_shift ATR_UNUSED = -1;
while ((p != lastack) && (rs > 0))
{
#if ENABLE_LOGGING
++trace_shift;
#endif
if (!m_pUnit[p])
{
p = shiftFwd(p);
LOGC(brlog.Error, log << "readBufferToFile: IPE: NULL unit found in file transmission, last good %"
<< trace_seq << " + " << trace_shift);
continue;
}
const CPacket& pkt = m_pUnit[p]->m_Packet;
#if ENABLE_LOGGING
trace_seq = pkt.getSeqNo();
#endif
const int pktlen = pkt.getLength();
const int remain_pktlen = pktlen - m_iNotch;
const int unitsize = std::min(remain_pktlen, rs);
ofs.write(pkt.m_pcData + m_iNotch, unitsize);
if (ofs.fail())
break;
if (rs >= remain_pktlen)
{
freeUnitAt(p);
p = shiftFwd(p);
m_iNotch = 0;
}
else
m_iNotch += rs;
rs -= unitsize;
}
countBytes(-1, -(len - rs), true);
m_iStartPos = p;
return len - rs;
}
int CRcvBuffer::ackData(int len)
{
SRT_ASSERT(len < m_iSize);
SRT_ASSERT(len > 0);
int end = shift(m_iLastAckPos, len);
{
int pkts = 0;
int bytes = 0;
for (int i = m_iLastAckPos; i != end; i = shiftFwd(i))
{
if (m_pUnit[i] == NULL)
continue;
pkts++;
bytes += (int)m_pUnit[i]->m_Packet.getLength();
}
if (pkts > 0)
countBytes(pkts, bytes, true);
}
HLOGC(brlog.Debug,
log << "ackData: shift by " << len << ", start=" << m_iStartPos << " end=" << m_iLastAckPos << " -> " << end);
m_iLastAckPos = end;
m_iMaxPos -= len;
if (m_iMaxPos < 0)
m_iMaxPos = 0;
const int dist = m_iLastAckPos - m_iStartPos;
if (dist < 0)
return dist + m_iSize;
return dist;
}
void CRcvBuffer::skipData(int len)
{
if (m_iStartPos == m_iLastAckPos)
m_iStartPos = (m_iStartPos + len) % m_iSize;
m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
m_iMaxPos -= len;
if (m_iMaxPos < 0)
m_iMaxPos = 0;
}
size_t CRcvBuffer::dropData(int len)
{
size_t stats_bytes = 0;
int p = m_iStartPos;
int past_q = shift(p, len);
while (p != past_q)
{
if (m_pUnit[p] && m_pUnit[p]->m_iFlag == CUnit::GOOD)
{
stats_bytes += m_pUnit[p]->m_Packet.getLength();
freeUnitAt(p);
}
p = shiftFwd(p);
}
m_iStartPos = past_q;
return stats_bytes;
}
bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
bool& w_passack,
int32_t& w_skipseqno,
int32_t& w_curpktseq)
{
w_skipseqno = SRT_SEQNO_NONE;
w_passack = false;
if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1))
{
HLOGC(brlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << w_curpktseq);
return true;
}
else if (!is_zero(w_tsbpdtime))
{
HLOGC(brlog.Debug, log << "getRcvFirstMsg: packets found, but in future");
return false;
}
bool haslost = false;
w_tsbpdtime = steady_clock::time_point(); w_passack = true;
for (int i = m_iLastAckPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
{
if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
{
haslost = true;
HLOGC(brlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i);
}
else
{
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (w_tsbpdtime <= steady_clock::now())
{
if (haslost)
{
w_skipseqno = m_pUnit[i]->m_Packet.m_iSeqNo;
w_curpktseq = w_skipseqno;
}
HLOGC(brlog.Debug,
log << "getRcvFirstMsg: found ready packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
return true;
}
HLOGC(brlog.Debug,
log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
return false;
}
}
HLOGC(brlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
return false;
}
steady_clock::time_point CRcvBuffer::debugGetDeliveryTime(int offset)
{
int i;
if (offset > 0)
i = shift(m_iStartPos, offset);
else
i = m_iStartPos;
CUnit* u = m_pUnit[i];
if (!u || u->m_iFlag != CUnit::GOOD)
return steady_clock::time_point();
return getPktTsbPdTime(u->m_Packet.getMsgTimeStamp());
}
int32_t CRcvBuffer::getTopMsgno() const
{
if (m_iStartPos == m_iLastAckPos)
return SRT_MSGNO_NONE;
if (!m_pUnit[m_iStartPos])
return SRT_MSGNO_NONE;
return m_pUnit[m_iStartPos]->m_Packet.getMsgSeq();
}
bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto)
{
const bool havelimit = upto != -1;
int end = -1, past_end = -1;
if (havelimit)
{
int stretch = (m_iSize + m_iStartPos - m_iLastAckPos) % m_iSize;
if (upto > stretch)
{
HLOGC(brlog.Debug, log << "position back " << upto << " exceeds stretch " << stretch);
return false;
}
end = m_iLastAckPos - upto;
if (end < 0)
end += m_iSize;
past_end = shiftFwd(end); HLOGC(brlog.Debug, log << "getRcvReadyMsg: will read from position " << end);
}
IF_HEAVY_LOGGING(const char* reason = "NOT RECEIVED");
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
{
if (havelimit && i == past_end)
break;
bool freeunit = false;
if (m_pUnit[i] == NULL)
{
HLOGC(brlog.Debug,
log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " SKIPPED - no unit there");
m_iStartPos = shiftFwd(m_iStartPos);
continue;
}
w_curpktseq = m_pUnit[i]->m_Packet.getSeqNo();
if (m_pUnit[i]->m_iFlag != CUnit::GOOD)
{
HLOGC(brlog.Debug,
log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " SKIPPED - unit not good");
freeunit = true;
}
else
{
if (!havelimit)
{
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
const steady_clock::duration towait = (w_tsbpdtime - steady_clock::now());
if (towait.count() > 0)
{
HLOGC(brlog.Debug,
log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " pkt %" << w_curpktseq << " NOT ready to play (only in " << count_milliseconds(towait)
<< "ms)");
return false;
}
if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
{
IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
freeunit = true;
}
else
{
HLOGC(brlog.Debug,
log << "getRcvReadyMsg: POS=" << i << " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " pkt %" << w_curpktseq << " ready to play (delayed " << count_milliseconds(towait)
<< "ms)");
return true;
}
}
else
{
if (i == end)
{
HLOGC(brlog.Debug, log << "CAUGHT required seq position " << i);
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (m_pUnit[i]->m_Packet.getMsgCryptoFlags() != EK_NOENC)
{
IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
freeunit = true;
}
else
{
HLOGC(brlog.Debug,
log << "getRcvReadyMsg: packet seq=" << w_curpktseq << " ready for extraction");
return true;
}
}
else
{
HLOGC(brlog.Debug, log << "SKIPPING position " << i);
freeunit = true;
}
}
}
if (freeunit)
{
HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED");
const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
countBytes(-1, -rmbytes, true);
freeUnitAt(i);
m_iStartPos = shiftFwd(m_iStartPos);
}
}
HLOGC(brlog.Debug, log << "getRcvReadyMsg: nothing to deliver: " << reason);
return false;
}
bool CRcvBuffer::isRcvDataReady(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int32_t seqdistance)
{
w_tsbpdtime = steady_clock::time_point();
if (m_bTsbPdMode)
{
const CPacket* pkt = getRcvReadyPacket(seqdistance);
if (!pkt)
{
HLOGC(brlog.Debug, log << "isRcvDataReady: packet NOT extracted.");
return false;
}
w_curpktseq = pkt->getSeqNo();
w_tsbpdtime = getPktTsbPdTime(pkt->getMsgTimeStamp());
if (seqdistance != -1 || w_tsbpdtime <= steady_clock::now())
{
HLOGC(brlog.Debug,
log << "isRcvDataReady: packet extracted seqdistance=" << seqdistance
<< " TsbPdTime=" << FormatTime(w_tsbpdtime));
return true;
}
HLOGC(brlog.Debug, log << "isRcvDataReady: packet extracted, but NOT READY");
return false;
}
return isRcvDataAvailable();
}
CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
{
if (seqdistance != -1)
{
if (seqdistance == 0)
{
LOGC(brlog.Fatal, log << "IPE: trying to extract packet past the last ACK-ed!");
return 0;
}
if (seqdistance > getRcvDataSize())
{
HLOGC(brlog.Debug,
log << "getRcvReadyPacket: Sequence offset=" << seqdistance
<< " is in the past (start=" << m_iStartPos << " end=" << m_iLastAckPos << ")");
return 0;
}
int i = shift(m_iLastAckPos, -seqdistance);
if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
{
HLOGC(brlog.Debug, log << "getRcvReadyPacket: FOUND PACKET %" << m_pUnit[i]->m_Packet.getSeqNo());
return &m_pUnit[i]->m_Packet;
}
HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED.");
return 0;
}
IF_HEAVY_LOGGING(int nskipped = 0);
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
{
if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
{
HLOGC(brlog.Debug,
log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
<< nskipped << " empty cells skipped)");
return &m_pUnit[i]->m_Packet;
}
IF_HEAVY_LOGGING(++nskipped);
}
return 0;
}
#if ENABLE_HEAVY_LOGGING
void CRcvBuffer::reportBufferStats() const
{
int nmissing = 0;
int32_t low_seq = SRT_SEQNO_NONE, high_seq = SRT_SEQNO_NONE;
int32_t low_ts = 0, high_ts = 0;
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
{
if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
{
low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp;
break;
}
++nmissing;
}
int n = m_iLastAckPos;
if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
{
high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp;
high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
}
else
{
high_ts = low_ts;
}
uint64_t upper_time = high_ts;
uint64_t lower_time = low_ts;
if (lower_time > upper_time)
upper_time += uint64_t(CPacket::MAX_TIMESTAMP) + 1;
int32_t timespan = upper_time - lower_time;
int seqspan = 0;
if (low_seq != SRT_SEQNO_NONE && high_seq != SRT_SEQNO_NONE)
{
seqspan = CSeqNo::seqoff(low_seq, high_seq);
}
LOGC(brlog.Debug,
log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing
<< "pkts");
LOGC(brlog.Debug,
log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << lower_time << " hi=" << upper_time << ")");
}
#endif
bool CRcvBuffer::isRcvDataReady()
{
steady_clock::time_point tsbpdtime;
int32_t seq;
return isRcvDataReady((tsbpdtime), (seq), -1);
}
int CRcvBuffer::getAvailBufSize() const
{
return m_iSize - getRcvDataSize() - 1;
}
int CRcvBuffer::getRcvDataSize() const
{
if (m_iLastAckPos >= m_iStartPos)
return m_iLastAckPos - m_iStartPos;
return m_iSize + m_iLastAckPos - m_iStartPos;
}
int CRcvBuffer::debugGetSize() const
{
int from = m_iStartPos, to = m_iLastAckPos;
int size = to - from;
if (size < 0)
size += m_iSize;
return size;
}
int CRcvBuffer::getRcvAvgDataSize(int& bytes, int& timespan)
{
timespan = round_val(m_mavg.timespan_ms());
bytes = round_val(m_mavg.bytes());
return round_val(m_mavg.pkts());
}
void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now)
{
if (!m_mavg.isTimeToUpdate(now))
return;
int bytes = 0;
int timespan_ms = 0;
const int pkts = getRcvDataSize(bytes, timespan_ms);
m_mavg.update(now, pkts, bytes, timespan_ms);
}
int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan)
{
timespan = 0;
if (m_bTsbPdMode)
{
int startpos = m_iStartPos;
for (; startpos != m_iLastAckPos; startpos = shiftFwd(startpos))
{
if ((NULL != m_pUnit[startpos]) && (CUnit::GOOD == m_pUnit[startpos]->m_iFlag))
break;
}
int endpos = m_iLastAckPos;
if (m_iLastAckPos != startpos)
{
if ((m_iMaxPos <= 0) || (!m_pUnit[m_iLastAckPos]) || (m_pUnit[m_iLastAckPos]->m_iFlag != CUnit::GOOD))
{
endpos = (m_iLastAckPos == 0 ? m_iSize - 1 : m_iLastAckPos - 1);
}
if ((NULL != m_pUnit[endpos]) && (NULL != m_pUnit[startpos]))
{
const steady_clock::time_point startstamp =
getPktTsbPdTime(m_pUnit[startpos]->m_Packet.getMsgTimeStamp());
const steady_clock::time_point endstamp = getPktTsbPdTime(m_pUnit[endpos]->m_Packet.getMsgTimeStamp());
if (endstamp > startstamp)
timespan = count_milliseconds(endstamp - startstamp);
}
if (0 < m_iAckedPktsCount)
timespan += 1;
}
}
HLOGF(brlog.Debug, "getRcvDataSize: %6d %6d %6d ms\n", m_iAckedPktsCount, m_iAckedBytesCount, timespan);
bytes = m_iAckedBytesCount;
return m_iAckedPktsCount;
}
unsigned CRcvBuffer::getRcvAvgPayloadSize() const
{
return m_uAvgPayloadSz;
}
void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag)
{
for (int i = m_iStartPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
if ((m_pUnit[i] != NULL) && (m_pUnit[i]->m_Packet.getMsgSeq(using_rexmit_flag) == msgno))
m_pUnit[i]->m_iFlag = CUnit::DROPPED;
}
steady_clock::time_point CRcvBuffer::getTsbPdTimeBase(uint32_t timestamp_us)
{
int64_t carryover = 0;
if (m_bTsbPdWrapCheck)
{
if (timestamp_us < TSBPD_WRAP_PERIOD)
{
carryover = int64_t(CPacket::MAX_TIMESTAMP) + 1;
}
else if ((timestamp_us >= TSBPD_WRAP_PERIOD) && (timestamp_us <= (TSBPD_WRAP_PERIOD * 2)))
{
m_bTsbPdWrapCheck = false;
m_tsTsbPdTimeBase += microseconds_from(int64_t(CPacket::MAX_TIMESTAMP) + 1);
LOGC(tslog.Debug,
log << "tsbpd wrap period ends with ts=" << timestamp_us << " - NEW TIME BASE: "
<< FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us");
}
}
else if (timestamp_us > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD))
{
m_bTsbPdWrapCheck = true;
LOGC(tslog.Debug,
log << "tsbpd wrap period begins with ts=" << timestamp_us << " drift: " << m_DriftTracer.drift()
<< "us.");
}
return (m_tsTsbPdTimeBase + microseconds_from(carryover));
}
void CRcvBuffer::applyGroupTime(const steady_clock::time_point& timebase,
bool wrp,
uint32_t delay,
const steady_clock::duration& udrift)
{
m_bTsbPdMode = true;
m_tsTsbPdTimeBase = timebase;
m_bTsbPdWrapCheck = wrp;
m_tdTsbPdDelay = microseconds_from(delay);
m_DriftTracer.forceDrift(count_microseconds(udrift));
}
void CRcvBuffer::applyGroupDrift(const steady_clock::time_point& timebase,
bool wrp,
const steady_clock::duration& udrift)
{
HLOGC(brlog.Debug,
log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift)
<< " TB: " << FormatTime(m_tsTsbPdTimeBase) << " -> " << FormatTime(timebase));
m_tsTsbPdTimeBase = timebase;
m_bTsbPdWrapCheck = wrp;
m_DriftTracer.forceDrift(count_microseconds(udrift));
}
bool CRcvBuffer::getInternalTimeBase(steady_clock::time_point& w_timebase, steady_clock::duration& w_udrift)
{
w_timebase = m_tsTsbPdTimeBase;
w_udrift = microseconds_from(m_DriftTracer.drift());
return m_bTsbPdWrapCheck;
}
steady_clock::time_point CRcvBuffer::getPktTsbPdTime(uint32_t timestamp)
{
const steady_clock::time_point time_base = getTsbPdTimeBase(timestamp);
HLOGC(brlog.Debug,
log << "getPktTsbPdTime: TIMEBASE=" << FormatTime(time_base) << " + dTS=" << timestamp
<< "us + LATENCY=" << FormatDuration<DUNIT_MS>(m_tdTsbPdDelay) << " + uDRIFT=" << m_DriftTracer.drift());
return (time_base + m_tdTsbPdDelay + microseconds_from(timestamp + m_DriftTracer.drift()));
}
int CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const steady_clock::duration& delay)
{
m_bTsbPdMode = true;
m_bTsbPdWrapCheck = false;
m_tsTsbPdTimeBase = timebase;
m_tdTsbPdDelay = delay;
return 0;
}
#ifdef SRT_DEBUG_TSBPD_DRIFT
void CRcvBuffer::printDriftHistogram(int64_t iDrift)
{
iDrift /= 100; if (-10 < iDrift && iDrift < 10)
{
m_TsbPdDriftHisto100us[10 + iDrift]++;
}
else
{
iDrift /= 10; if (-10 < iDrift && iDrift < 10)
m_TsbPdDriftHisto1ms[10 + iDrift]++;
else if (iDrift <= -10)
m_TsbPdDriftHisto1ms[0]++;
else
m_TsbPdDriftHisto1ms[20]++;
}
++m_iTsbPdDriftNbSamples;
if ((m_iTsbPdDriftNbSamples % TSBPD_DRIFT_PRT_SAMPLES) == 0)
{
int* histo = m_TsbPdDriftHisto1ms;
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[0],
histo[1],
histo[2],
histo[3],
histo[4],
histo[5],
histo[6],
histo[7],
histo[8],
histo[9],
histo[10]);
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],
histo[12],
histo[13],
histo[14],
histo[15],
histo[16],
histo[17],
histo[18],
histo[19],
histo[20]);
histo = m_TsbPdDriftHisto100us;
fprintf(stderr,
" %4d %4d %4d %4d %4d %4d %4d %4d %4d - %4d + ",
histo[1],
histo[2],
histo[3],
histo[4],
histo[5],
histo[6],
histo[7],
histo[8],
histo[9],
histo[10]);
fprintf(stderr,
"%4d %4d %4d %4d %4d %4d %4d %4d %4d\n",
histo[11],
histo[12],
histo[13],
histo[14],
histo[15],
histo[16],
histo[17],
histo[18],
histo[19]);
m_iTsbPdDriftNbSamples = 0;
}
}
void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg)
{
fprintf(stderr,
"%s: tsbpd offset=%d drift=%d usec\n",
FormatTime(steady_clock::now()).c_str(),
tsbPdOffset,
tsbPdDriftAvg);
memset(m_TsbPdDriftHisto100us, 0, sizeof(m_TsbPdDriftHisto100us));
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
}
#endif
bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us,
Mutex& mutex_to_lock,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
{
if (!m_bTsbPdMode) return false;
const steady_clock::duration iDrift =
steady_clock::now() - (getTsbPdTimeBase(timestamp_us) + microseconds_from(timestamp_us));
enterCS(mutex_to_lock);
bool updated = m_DriftTracer.update(count_microseconds(iDrift));
#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftHistogram(count_microseconds(iDrift));
#endif
if (updated)
{
#ifdef SRT_DEBUG_TSBPD_DRIFT
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
#endif
#if ENABLE_HEAVY_LOGGING
const steady_clock::time_point oldbase = m_tsTsbPdTimeBase;
#endif
steady_clock::duration overdrift = microseconds_from(m_DriftTracer.overdrift());
m_tsTsbPdTimeBase += overdrift;
HLOGC(brlog.Debug,
log << "DRIFT=" << FormatDuration(iDrift) << " AVG=" << (m_DriftTracer.drift() / 1000.0)
<< "ms, TB: " << FormatTime(oldbase) << " EXCESS: " << FormatDuration(overdrift)
<< " UPDATED TO: " << FormatTime(m_tsTsbPdTimeBase));
}
else
{
HLOGC(brlog.Debug,
log << "DRIFT=" << FormatDuration(iDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase));
}
leaveCS(mutex_to_lock);
w_udrift = iDrift;
w_newtimebase = m_tsTsbPdTimeBase;
return updated;
}
int CRcvBuffer::readMsg(char* data, int len)
{
SRT_MSGCTRL dummy = srt_msgctrl_default;
return readMsg(data, len, (dummy), -1);
}
int CRcvBuffer::readMsg(char* data, int len, SRT_MSGCTRL& w_msgctl, int upto)
{
int p = -1, q = -1;
bool passack;
bool empty = accessMsg((p), (q), (passack), (w_msgctl.srctime), upto);
if (empty)
return 0;
CPacket& pkt1 = m_pUnit[p]->m_Packet;
w_msgctl.pktseq = pkt1.getSeqNo();
w_msgctl.msgno = pkt1.getMsgSeq();
return extractData((data), len, p, q, passack);
}
#ifdef SRT_DEBUG_TSBPD_OUTJITTER
void CRcvBuffer::debugTraceJitter(int64_t rplaytime)
{
uint64_t now = CTimer::getTime();
if ((now - rplaytime) / 10 < 10)
m_ulPdHisto[0][(now - rplaytime) / 10]++;
else if ((now - rplaytime) / 100 < 10)
m_ulPdHisto[1][(now - rplaytime) / 100]++;
else if ((now - rplaytime) / 1000 < 10)
m_ulPdHisto[2][(now - rplaytime) / 1000]++;
else
m_ulPdHisto[3][1]++;
}
#endif
bool CRcvBuffer::accessMsg(int& w_p, int& w_q, bool& w_passack, int64_t& w_playtime, int upto)
{
bool empty = true;
if (m_bTsbPdMode)
{
w_passack = false;
int seq = 0;
steady_clock::time_point play_time;
const bool isReady = getRcvReadyMsg(play_time, (seq), upto);
w_playtime = count_microseconds(play_time.time_since_epoch());
if (isReady)
{
empty = false;
w_p = w_q = m_iStartPos;
debugTraceJitter(w_playtime);
}
}
else
{
w_playtime = 0;
if (scanMsg((w_p), (w_q), (w_passack)))
empty = false;
}
return empty;
}
int CRcvBuffer::extractData(char* data, int len, int p, int q, bool passack)
{
SRT_ASSERT(len > 0);
int rs = len > 0 ? len : 0;
const int past_q = shiftFwd(q);
while (p != past_q)
{
const int pktlen = (int)m_pUnit[p]->m_Packet.getLength();
if (pktlen > 0)
countBytes(-1, -pktlen, true);
const int unitsize = ((rs >= 0) && (pktlen > rs)) ? rs : pktlen;
HLOGC(brlog.Debug, log << "readMsg: checking unit POS=" << p);
if (unitsize > 0)
{
memcpy((data), m_pUnit[p]->m_Packet.m_pcData, unitsize);
data += unitsize;
rs -= unitsize;
IF_HEAVY_LOGGING(readMsgHeavyLogging(p));
}
else
{
HLOGC(brlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT");
}
if (!passack)
{
HLOGC(brlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p);
freeUnitAt(p);
}
else
{
HLOGC(brlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p);
m_pUnit[p]->m_iFlag = CUnit::PASSACK;
}
p = shiftFwd(p);
}
if (!passack)
m_iStartPos = past_q;
HLOGC(brlog.Debug,
log << "rcvBuf/extractData: begin=" << m_iStartPos << " reporting extraction size=" << (len - rs));
return len - rs;
}
string CRcvBuffer::debugTimeState(size_t first_n_pkts) const
{
stringstream ss;
int ipos = m_iStartPos;
for (size_t i = 0; i < first_n_pkts; ++i, ipos = CSeqNo::incseq(ipos))
{
const CUnit* unit = m_pUnit[ipos];
if (!unit)
{
ss << "pkt[" << i << "] missing, ";
continue;
}
const CPacket& pkt = unit->m_Packet;
ss << "pkt[" << i << "] ts=" << pkt.getMsgTimeStamp() << ", ";
}
return ss.str();
}
#if ENABLE_HEAVY_LOGGING
void CRcvBuffer::readMsgHeavyLogging(int p)
{
static steady_clock::time_point prev_now;
static steady_clock::time_point prev_srctime;
const CPacket& pkt = m_pUnit[p]->m_Packet;
const int32_t seq = pkt.m_iSeqNo;
steady_clock::time_point nowtime = steady_clock::now();
steady_clock::time_point srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp());
const int64_t timediff_ms = count_milliseconds(nowtime - srctime);
const int64_t nowdiff_ms = is_zero(prev_now) ? count_milliseconds(nowtime - prev_now) : 0;
const int64_t srctimediff_ms = is_zero(prev_srctime) ? count_milliseconds(srctime - prev_srctime) : 0;
const int next_p = shiftFwd(p);
CUnit* u = m_pUnit[next_p];
string next_playtime;
if (u && u->m_iFlag == CUnit::GOOD)
{
next_playtime = FormatTime(getPktTsbPdTime(u->m_Packet.getMsgTimeStamp()));
}
else
{
next_playtime = "NONE";
}
LOGC(brlog.Debug,
log << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << FormatTime(srctime) << " in " << timediff_ms
<< "ms - TIME-PREVIOUS: PKT: " << srctimediff_ms << " LOCAL: " << nowdiff_ms << " !"
<< BufferStamp(pkt.data(), pkt.size()) << " NEXT pkt T=" << next_playtime);
prev_now = nowtime;
prev_srctime = srctime;
}
#endif
bool CRcvBuffer::scanMsg(int& w_p, int& w_q, bool& w_passack)
{
if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
{
HLOGC(brlog.Debug, log << "scanMsg: empty buffer");
return false;
}
int rmpkts = 0;
int rmbytes = 0;
while (m_iStartPos != m_iLastAckPos)
{
if (!m_pUnit[m_iStartPos])
{
if (++m_iStartPos == m_iSize)
m_iStartPos = 0;
continue;
}
if (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD && m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST)
{
bool good = true;
for (int i = m_iStartPos; i != m_iLastAckPos;)
{
if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD)
{
good = false;
break;
}
if (m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST)
break;
if (++i == m_iSize)
i = 0;
}
if (good)
break;
}
rmpkts++;
rmbytes += freeUnitAt(m_iStartPos);
m_iStartPos = shiftFwd(m_iStartPos);
}
countBytes(-rmpkts, -rmbytes, true);
w_p = -1; w_q = m_iStartPos; w_passack = m_iStartPos == m_iLastAckPos;
bool found = false;
for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++i)
{
if (m_pUnit[w_q] && m_pUnit[w_q]->m_iFlag == CUnit::GOOD)
{
switch (m_pUnit[w_q]->m_Packet.getMsgBoundary())
{
case PB_SOLO: w_p = w_q;
found = true;
break;
case PB_FIRST: w_p = w_q;
break;
case PB_LAST: if (w_p != -1)
found = true;
break;
case PB_SUBSEQUENT:; }
}
else
{
w_p = -1;
}
if (found)
{
if (!w_passack || !m_pUnit[w_q]->m_Packet.getMsgOrderFlag())
{
HLOGC(brlog.Debug, log << "scanMsg: found next-to-broken message, delivering OUT OF ORDER.");
break;
}
found = false;
}
if (++w_q == m_iSize)
w_q = 0;
if (w_q == m_iLastAckPos)
w_passack = true;
}
if (!found)
{
if ((w_p != -1) && (shiftFwd(w_q) == w_p))
{
HLOGC(brlog.Debug, log << "scanMsg: BUFFER FULL and message is INCOMPLETE. Returning PARTIAL MESSAGE.");
found = true;
}
else
{
HLOGC(brlog.Debug, log << "scanMsg: PARTIAL or NO MESSAGE found: p=" << w_p << " q=" << w_q);
}
}
else
{
HLOGC(brlog.Debug,
log << "scanMsg: extracted message p=" << w_p << " q=" << w_q << " ("
<< ((w_q - w_p + m_iSize + 1) % m_iSize) << " packets)");
}
return found;
}