#include "normObject.h"
#include "normSession.h"
#ifndef _WIN32_WCE
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#endif
NormObject::NormObject(NormObject::Type theType,
class NormSession& theSession,
class NormSenderNode* theSender,
const NormObjectId& transportId)
: type(theType), session(theSession), sender(theSender), reference_count(1),
transport_id(transportId), segment_size(0), pending_info(false), repair_info(false),
current_block_id(0), next_segment_id(0),
max_pending_block(0), max_pending_segment(0),
info_ptr(NULL), info_len(0), first_pass(true), accepted(false), notify_on_update(true),
user_data(NULL)
#ifndef USE_PROTO_TREE
, next(NULL)
#endif
{
if (theSender)
{
nacking_mode = theSender->GetDefaultNackingMode();
theSender->Retain();
}
else
{
nacking_mode = NACK_NORMAL; }
}
NormObject::~NormObject()
{
Close();
if (NULL != info_ptr)
{
delete[] info_ptr;
info_ptr = NULL;
}
}
void NormObject::Retain()
{
reference_count++;
if (sender) sender->Retain();
}
void NormObject::Release()
{
if (sender) sender->Release();
if (reference_count)
{
reference_count--;
}
else
{
PLOG(PL_ERROR, "NormObject::Release() releasing non-retained object?!\n");
}
if (0 == reference_count) delete this;
}
NormNodeId NormObject::LocalNodeId() const
{
return session.LocalNodeId();
}
NormNodeId NormObject::GetSenderNodeId() const
{
return sender ? sender->GetId() : NORM_NODE_NONE;
}
bool NormObject::Open(const NormObjectSize& objectSize,
const char* infoPtr,
UINT16 infoLen,
UINT16 segmentSize,
UINT8 fecId,
UINT8 fecM,
UINT16 numData,
UINT16 numParity)
{
if (sender)
{
if (infoLen > 0)
{
pending_info = true;
if (!(info_ptr = new char[segmentSize]))
{
PLOG(PL_FATAL, "NormObject::Open() info allocation error\n");
return false;
}
}
info_len = 0; last_nack_time.GetCurrentTime(); }
else
{
if (NULL != infoPtr)
{
if (NULL != info_ptr) delete[] info_ptr;
if (infoLen > segmentSize)
{
PLOG(PL_FATAL, "NormObject::Open() info too big error\n");
info_len = 0;
return false;
}
if (!(info_ptr = new char[infoLen]))
{
PLOG(PL_FATAL, "NormObject::Open() info allocation error\n");
info_len = 0;
return false;
}
memcpy(info_ptr, infoPtr, infoLen);
info_len = infoLen;
pending_info = true;
}
else
{
if (NormSession::FTI_INFO == session.SenderFtiMode())
{
pending_info = true;
}
}
}
NormObjectSize numSegments = objectSize / NormObjectSize(segmentSize);
NormObjectSize numBlocks = numSegments / NormObjectSize(numData);
fec_block_mask = NormPayloadId::GetFecBlockMask(fecId, fecM);
UINT32 blockIdMax = fec_block_mask;
if (IsStream()) blockIdMax = (blockIdMax / 2) - 1;
if ((numBlocks.MSB() > 0) || (numBlocks.LSB() > blockIdMax))
{
PLOG(PL_FATAL, "NormObject::Open() error: object size exceeds FEC blocking and segmentation parameter capability\n");
Close();
return false;
}
if (!IsStream()) fec_block_mask = 0;
if (!block_buffer.Init(numBlocks.LSB(), 256, fec_block_mask))
{
PLOG(PL_FATAL, "NormObject::Open() init block_buffer error\n");
Close();
return false;
}
if (!pending_mask.Init(numBlocks.LSB(), fec_block_mask))
{
PLOG(PL_FATAL, "NormObject::Open() init pending_mask (%lu) error: %s\n",
(unsigned long)numBlocks.LSB(), GetErrorString());
Close();
return false;
}
if (!repair_mask.Init(numBlocks.LSB(), fec_block_mask))
{
PLOG(PL_FATAL, "NormObject::Open() init pending_mask error\n");
Close();
return false;
}
repair_mask.Clear();
if (STREAM == type)
{
small_block_size = large_block_size = numData;
small_block_count = large_block_count = numBlocks.LSB();
final_segment_size = segmentSize;
if (NULL == sender)
{
NormStreamObject* stream = static_cast<NormStreamObject*>(this);
stream->StreamResync(NormBlockId(0));
}
}
else
{
pending_mask.Clear();
pending_mask.SetBits(0, numBlocks.LSB());
NormObjectSize largeBlockSize;
if ((0 != numBlocks.MSB()) || (0 != numBlocks.LSB()))
largeBlockSize = numSegments / numBlocks;
ASSERT(0 == largeBlockSize.MSB());
large_block_size = largeBlockSize.LSB();
if (numSegments == (numBlocks*largeBlockSize))
{
small_block_size = large_block_size;
small_block_count = numBlocks.LSB();
large_block_count = 0;
}
else
{
small_block_size = large_block_size - 1;
NormObjectSize largeBlockCount = numSegments - numBlocks*small_block_size;
ASSERT(0 == largeBlockCount.MSB());
large_block_count = largeBlockCount.LSB();
NormObjectSize smallBlockCount = numBlocks - largeBlockCount;
ASSERT(0 == smallBlockCount.MSB());
small_block_count = smallBlockCount.LSB();
}
final_block_id = large_block_count + small_block_count - 1;
NormObjectSize finalSegmentSize =
objectSize - (numSegments - NormObjectSize((UINT32)1))*segmentSize;
ASSERT(0 == finalSegmentSize.MSB());
final_segment_size = finalSegmentSize.LSB();
}
object_size = objectSize;
segment_size = segmentSize;
fec_id = fecId;
fec_m = fecM;
ndata = numData;
nparity = numParity;
current_block_id = 0;
next_segment_id = 0;
max_pending_block = 0;
max_pending_segment = 0;
return true;
}
void NormObject::Close()
{
NormBlock* block;
while ((block = block_buffer.Find(block_buffer.RangeLo())))
{
block_buffer.Remove(block);
if (sender)
sender->PutFreeBlock(block);
else
session.SenderPutFreeBlock(block);
}
repair_mask.Destroy();
pending_mask.Destroy();
block_buffer.Destroy();
segment_size = 0;
}
NormObjectSize NormObject::GetBytesPending() const
{
NormBlockId nextId;
if (!IsStream() && GetFirstPending(nextId))
{
NormObjectSize largeBlockBytes = NormObjectSize(large_block_size) *
NormObjectSize(segment_size);
NormObjectSize smallBlockBytes = NormObjectSize(small_block_size) *
NormObjectSize(segment_size);
NormObjectSize lastBlockBytes = smallBlockBytes - NormObjectSize(segment_size) +
NormObjectSize(final_segment_size);
NormObjectSize pendingBytes(0);
do
{
NormBlock* block = block_buffer.Find(nextId);
if (block)
pendingBytes += block->GetBytesPending(GetBlockSize(nextId), segment_size,
final_block_id, final_segment_size);
else if (nextId.GetValue() < large_block_count)
pendingBytes += largeBlockBytes;
else if (nextId == final_block_id)
pendingBytes += lastBlockBytes;
else
pendingBytes += smallBlockBytes;
Increment(nextId);
} while (GetNextPending(nextId));
return pendingBytes;
}
else
{
return NormObjectSize(0);
}
}
bool NormObject::HandleInfoRequest(bool holdoff)
{
bool increasedRepair = false;
if (info_ptr || (NormSession::FTI_INFO == session.SenderFtiMode()))
{
if (!repair_info)
{
increasedRepair = true;
if (holdoff)
{
if (pending_info)
increasedRepair = false;
else
pending_info = true;
}
else
{
pending_info = true; repair_info = true;
}
}
}
return increasedRepair;
}
bool NormObject::HandleBlockRequest(const NormBlockId& firstId, const NormBlockId& lastId)
{
PLOG(PL_TRACE, "NormObject::HandleBlockRequest() node>%lu blk>%lu -> blk>%lu\n",
(unsigned long)LocalNodeId(),
(unsigned long)firstId.GetValue(),
(unsigned long)lastId.GetValue());
bool blockInRange = true;
NormBlockId nextId = firstId;
while (Compare(nextId, lastId) <= 0)
{
if (!repair_mask.Test(nextId.GetValue()))
{
if (!pending_mask.CanSet(nextId.GetValue()))
{
PLOG(PL_ERROR, "NormObject::HandleBlockRequest() pending_mask.CanSet(%lu) error\n",
(unsigned long)nextId.GetValue());
blockInRange = false;
if (IsStream())
static_cast<NormStreamObject*>(this)->UnlockBlock(nextId);
}
else if (!repair_mask.Set(nextId.GetValue()))
{
PLOG(PL_ERROR, "NormObject::HandleBlockRequest() repair_mask.Set(%lu) error\n",
(unsigned long)nextId.GetValue());
blockInRange = false;
if (IsStream())
static_cast<NormStreamObject*>(this)->UnlockBlock(nextId);
}
}
Increment(nextId);
}
return blockInRange;
}
bool NormObject::TxReset(NormBlockId firstBlock, bool requeue)
{
bool increasedRepair = false;
if (!pending_info && (HaveInfo() || (NormSession::FTI_INFO == session.SenderFtiMode())))
{
increasedRepair = true;
pending_info = true;
}
repair_info = false;
repair_mask.Reset(firstBlock.GetValue());
repair_mask.Xor(pending_mask);
if (repair_mask.IsSet())
{
increasedRepair = true;
pending_mask.Reset(firstBlock.GetValue());
}
repair_mask.Clear();
NormBlockBuffer::Iterator iterator(block_buffer);
NormBlock* block;
while ((block = iterator.GetNextBlock()))
{
NormBlockId blockId = block->GetId();
if (Compare(blockId, firstBlock) >= 0)
{
increasedRepair |= block->TxReset(GetBlockSize(blockId),
nparity,
session.SenderAutoParity(),
segment_size);
if (requeue) block->ClearFlag(NormBlock::IN_REPAIR); }
}
if (requeue)
{
first_pass = true;
max_pending_block = 0;
}
return increasedRepair;
}
bool NormObject::TxResetBlocks(const NormBlockId& firstId, const NormBlockId& lastId)
{
bool increasedRepair = false;
UINT16 autoParity = session.SenderAutoParity();
NormBlockId nextId = firstId;
while (Compare(nextId, lastId) <= 0)
{
if (!pending_mask.Test(nextId.GetValue()))
{
pending_mask.Set(nextId.GetValue());
increasedRepair = true;
}
NormBlock* block = block_buffer.Find(nextId);
if (NULL != block)
increasedRepair |= block->TxReset(GetBlockSize(nextId), nparity, autoParity, segment_size);
Increment(nextId);
}
return increasedRepair;
}
bool NormObject::TxUpdateBlock(NormBlock* theBlock,
NormSegmentId firstSegmentId,
NormSegmentId lastSegmentId,
UINT16 numErasures)
{
NormBlockId blockId = theBlock->GetId();
if (pending_mask.CanSet(blockId.GetValue()))
{
if (theBlock->TxUpdate(firstSegmentId, lastSegmentId,
GetBlockSize(blockId), nparity,
numErasures))
{
pending_mask.Set(blockId.GetValue());
return true;
}
}
else if (IsStream())
{
static_cast<NormStreamObject*>(this)->UnlockBlock(blockId);
}
return false;
}
bool NormObject::ActivateRepairs()
{
bool repairsActivated = false;
if (repair_info)
{
pending_info = true;
repair_info = false;
repairsActivated = true;
}
NormBlockId nextId;
if (GetFirstRepair(nextId))
{
NormBlockId lastId;
GetLastRepair(lastId); PLOG(PL_DEBUG, "NormObject::ActivateRepairs() node>%lu obj>%hu activating blk>%lu->%lu block repairs ...\n",
(unsigned long)LocalNodeId(),
(UINT16)transport_id,
(unsigned long)nextId.GetValue(),
(unsigned long)lastId.GetValue());
UINT16 autoParity = session.SenderAutoParity();
do
{
if (pending_mask.CanSet(nextId.GetValue()))
{
NormBlock* block = block_buffer.Find(nextId);
if (NULL != block) block->TxReset(GetBlockSize(nextId), nparity, autoParity, segment_size);
pending_mask.Set(nextId.GetValue());
repairsActivated = true;
}
else if (IsStream())
{
static_cast<NormStreamObject*>(this)->UnlockBlock(nextId);
}
repair_mask.Unset(nextId.GetValue());
Increment(nextId);
} while (GetNextRepair(nextId));
ASSERT(!repair_mask.IsSet());
}
NormBlockBuffer::Iterator iterator(block_buffer);
NormBlock* block;
while ((block = iterator.GetNextBlock()))
{
if (block->ActivateRepairs(nparity))
{
PLOG(PL_TRACE, "NormObject::ActivateRepairs() node>%lu obj>%hu activated blk>%lu segment repairs ...\n",
(unsigned long)LocalNodeId(), (UINT16)transport_id, (unsigned long)block->GetId().GetValue());
if (!pending_mask.Set(block->GetId().GetValue()))
{
block->ClearPending();
if (IsStream())
static_cast<NormStreamObject*>(this)->UnlockBlock(block->GetId());
}
else
{
repairsActivated = true;
}
}
}
return repairsActivated;
}
bool NormObject::AppendRepairAdv(NormCmdRepairAdvMsg& cmd)
{
NormBlockId nextId = 0;
GetFirstRepair(nextId);
NormBlockId endId = 0;
GetLastRepair(endId);
if (block_buffer.IsEmpty())
{
if (repair_mask.IsSet()) Increment(endId);
}
else
{
NormBlockId lo = block_buffer.RangeLo();
NormBlockId hi = block_buffer.RangeHi();
if (repair_mask.IsSet())
{
nextId = (Compare(lo, nextId) < 0) ? lo : nextId;
endId = (Compare(hi, endId) > 0) ? hi : endId;
}
else
{
nextId = lo;
endId = hi;
}
Increment(endId);
}
NormRepairRequest req;
bool requestAppended = false;
req.SetFlag(NormRepairRequest::BLOCK);
if (repair_info) req.SetFlag(NormRepairRequest::INFO);
NormRepairRequest::Form prevForm = NormRepairRequest::INVALID;
NormBlockId firstId;
UINT32 blockCount = 0;
while (Compare(nextId, endId) < 0)
{
NormBlockId currentId = nextId;
Increment(nextId);
bool repairEntireBlock = repair_mask.Test(currentId.GetValue());
if (repairEntireBlock)
{
if (!blockCount) firstId = currentId;
blockCount++;
}
if (blockCount && (!repairEntireBlock || (Compare(nextId, endId) >= 0)))
{
NormRepairRequest::Form form;
switch (blockCount)
{
case 0:
form = NormRepairRequest::INVALID;
break;
case 1:
case 2:
form = NormRepairRequest::ITEMS;
break;
default:
form = NormRepairRequest::RANGES;
break;
}
if (form != prevForm)
{
if (NormRepairRequest::INVALID != prevForm)
{
if (0 == cmd.PackRepairRequest(req))
{
PLOG(PL_ERROR, "NormObject::AppendRepairAdv() warning: full msg\n");
return requestAppended;
}
requestAppended = true;
}
cmd.AttachRepairRequest(req, segment_size); req.SetForm(form);
prevForm = form;
}
switch (form)
{
case NormRepairRequest::INVALID:
ASSERT(0); break;
case NormRepairRequest::ITEMS:
req.AppendRepairItem(fec_id, fec_m, transport_id, firstId, GetBlockSize(firstId), 0);
if (2 == blockCount)
req.AppendRepairItem(fec_id, fec_m, transport_id, currentId, GetBlockSize(currentId), 0);
break;
case NormRepairRequest::RANGES:
req.AppendRepairRange(fec_id, fec_m, transport_id, firstId, GetBlockSize(firstId), 0,
transport_id, currentId, GetBlockSize(currentId), 0);
break;
case NormRepairRequest::ERASURES:
break;
}
blockCount = 0;
}
if (!repairEntireBlock)
{
NormBlock* block = block_buffer.Find(currentId);
if (block && block->IsRepairPending())
{
if (NormRepairRequest::INVALID != prevForm)
{
if (0 == cmd.PackRepairRequest(req))
{
PLOG(PL_ERROR, "NormObject::AppendRepairAdv() warning: full msg\n");
return requestAppended;
}
prevForm = NormRepairRequest::INVALID;
}
if (block->AppendRepairAdv(cmd, transport_id, repair_info, fec_id, fec_m, GetBlockSize(currentId), segment_size))
requestAppended = true;
else
return requestAppended;
}
}
} if (NormRepairRequest::INVALID != prevForm)
{
if (0 == cmd.PackRepairRequest(req))
{
PLOG(PL_ERROR, "NormObject::AppendRepairAdv() warning: full msg\n");
return requestAppended;
}
requestAppended = true;
}
else if (repair_info && !requestAppended)
{
req.ClearFlag(NormRepairRequest::BLOCK);
req.SetForm(NormRepairRequest::ITEMS);
req.AppendRepairItem(fec_id, fec_m, transport_id, 0, 0, 0);
if (0 == cmd.PackRepairRequest(req))
{
PLOG(PL_ERROR, "NormObject::AppendRepairAdv() warning: full msg\n");
return requestAppended;
}
}
return requestAppended;
}
bool NormObject::FindRepairIndex(NormBlockId& blockId, NormSegmentId& segmentId)
{
if (repair_info)
{
blockId = 0;
segmentId = 0;
return true;
}
NormBlockBuffer::Iterator iterator(block_buffer);
NormBlock* block;
while ((block = iterator.GetNextBlock()))
if (block->IsRepairPending()) break;
if (GetFirstRepair(blockId))
{
if ((NULL == block) || (Compare(blockId, block->GetId()) <= 0))
{
segmentId = 0;
return true;
}
}
if (block)
{
block->GetFirstRepair(segmentId);
if (segmentId >= GetBlockSize(block->GetId()))
segmentId = GetBlockSize(block->GetId()) - 1;
return true;
}
return false;
}
bool NormObject::IsRepairPending()
{
if (repair_info) return true;
if (repair_mask.IsSet()) return true;
NormBlockBuffer::Iterator iterator(block_buffer);
NormBlock* block;
while ((block = iterator.GetNextBlock()))
{
if (block->IsRepairPending()) return true;
}
return false;
}
bool NormObject::IsPending(bool flush) const
{
if (pending_info) return true;
if (flush)
{
return pending_mask.IsSet();
}
else
{
NormBlockId firstId;
if (GetFirstPending(firstId))
{
if (Compare(firstId, max_pending_block) < 0)
{
return true;
}
else if (Compare(firstId, max_pending_block) > 0)
{
return false;
}
else
{
if (max_pending_segment > 0)
{
NormBlock* block = block_buffer.Find(max_pending_block);
if (block)
{
NormSegmentId firstPending = 0;
block->GetFirstPending(firstPending);
if (firstPending < max_pending_segment)
return true;
else
return false;
}
else
{
return true;
}
}
else
{
return false;
}
}
}
else
{
return false;
}
}
}
bool NormObject::PassiveRepairCheck(NormBlockId blockId,
NormSegmentId segmentId)
{
if (pending_info) return true;
NormBlockId firstPendingBlock;
if (GetFirstPending(firstPendingBlock))
{
if (Compare(firstPendingBlock, blockId) < 0)
{
return true;
}
else if (firstPendingBlock == blockId)
{
NormBlock* block = block_buffer.Find(firstPendingBlock);
if (NULL != block)
{
NormSegmentId firstPendingSegment;
if (block->GetFirstPending(firstPendingSegment))
{
if (firstPendingSegment <= segmentId)
return true;
}
else
{
ASSERT(0); }
}
else
{
return true; }
}
}
if (IsStream())
return static_cast<NormStreamObject*>(this)->PassiveReadCheck(blockId, segmentId);
return false;
}
bool NormObject::ReceiverRepairCheck(CheckLevel level,
NormBlockId blockId,
NormSegmentId segmentId,
bool timerActive,
bool holdoffPhase)
{
NormBlockId nextBlockId = 0;
NormBlockId endBlockId = 0;
bool thruObject = false;
bool startRepairTimer = false;
switch (level)
{
case BLIND_CHECK:
ASSERT(0); return false;
case TO_OBJECT:
return false;
case THRU_INFO:
if (timerActive)
{
if (!holdoffPhase)
{
current_block_id = 0;
next_segment_id = 0;
}
}
else
{
if (pending_info)
startRepairTimer = true;
current_block_id = 0;
next_segment_id = 0;
}
break;
case TO_BLOCK:
if (Compare(blockId, max_pending_block) >= 0)
{
max_pending_block = blockId;
max_pending_segment = 0;
}
if (timerActive)
{
if (!holdoffPhase)
{
if (Compare(blockId, current_block_id) < 0)
{
current_block_id = blockId;
next_segment_id = 0;
}
}
}
else
{
if (pending_info)
{
startRepairTimer = true;
GetFirstPending(nextBlockId);
endBlockId = blockId;
}
else
{
NormBlockId firstPending;
if (GetFirstPending(firstPending))
{
if (Compare(firstPending, blockId) < 0)
{
startRepairTimer = true;
nextBlockId = firstPending;
endBlockId = blockId;
}
}
}
current_block_id = blockId;
next_segment_id = 0;
}
break;
case THRU_SEGMENT:
if (Compare(blockId, max_pending_block) > 0)
{
max_pending_block = blockId;
max_pending_segment = segmentId + 1;
}
else if (blockId == max_pending_block)
{
if (segmentId >= max_pending_segment)
max_pending_segment = segmentId + 1;
}
if (!IsStream() && (blockId == final_block_id))
{
unsigned int finalSegment = GetBlockSize(blockId) - 1;
if (finalSegment <= segmentId)
thruObject = true;
}
if (timerActive)
{
if (!holdoffPhase)
{
if (Compare(blockId, current_block_id) < 0)
{
current_block_id = blockId;
next_segment_id = segmentId + 1;
}
else if (blockId == current_block_id)
{
if (segmentId < next_segment_id)
next_segment_id = segmentId + 1;
}
}
}
else
{
if (pending_info)
{
startRepairTimer = true;
GetFirstPending(nextBlockId);
endBlockId = blockId;
Increment(endBlockId);
}
else
{
NormBlockId firstPending;
if (GetFirstPending(firstPending))
{
if (Compare(firstPending, blockId) < 0)
{
startRepairTimer = true;
}
else if (firstPending == blockId)
{
NormBlock* block = block_buffer.Find(blockId);
if (NULL != block)
{
ASSERT(block->IsPending());
NormSymbolId firstPendingSegment;
block->GetFirstPending(firstPendingSegment);
if (firstPendingSegment <= segmentId)
startRepairTimer = true;
}
else
{
startRepairTimer = true;
}
}
if (startRepairTimer)
{
nextBlockId = firstPending;
endBlockId = blockId;
Increment(endBlockId);
}
}
}
current_block_id = blockId;
next_segment_id = segmentId + 1;
}
break;
case THRU_BLOCK:
if (Compare(blockId, max_pending_block) > 0)
{
max_pending_block = blockId;
max_pending_segment = GetBlockSize(blockId);
}
if (!IsStream() && (blockId == final_block_id))
thruObject = true;
if (timerActive)
{
if (!holdoffPhase)
{
if (Compare(blockId, current_block_id) < 0)
{
current_block_id = blockId;
next_segment_id = GetBlockSize(blockId);
}
}
}
else
{
if (pending_info)
{
startRepairTimer = true;
GetFirstPending(nextBlockId);
endBlockId = blockId;
Increment(endBlockId);
}
else
{
NormBlockId firstPending;
if (GetFirstPending(firstPending))
{
if (Compare(firstPending, blockId) <= 0)
{
startRepairTimer = true;
nextBlockId = firstPending;
endBlockId = blockId;
Increment(endBlockId);
}
}
}
current_block_id = blockId;
next_segment_id = GetBlockSize(blockId);
}
break;
case THRU_OBJECT:
if (!IsStream())
max_pending_block = final_block_id;
else if (Compare(blockId, max_pending_block) > 0)
max_pending_block = blockId;
max_pending_segment = GetBlockSize(max_pending_block);
thruObject = true;
if (!timerActive)
{
if (pending_info || pending_mask.IsSet())
{
startRepairTimer = true;
GetFirstPending(nextBlockId);
GetLastPending(endBlockId);
Increment(endBlockId);
}
if (IsStream())
current_block_id = max_pending_block;
else
current_block_id = final_block_id;
next_segment_id = GetBlockSize(current_block_id);
}
break;
}
if (thruObject && session.RcvrIsLowDelay() && !IsStream())
{
sender->AbortObject(this);
return false;
}
if (startRepairTimer)
{
repair_info = false; if (repair_mask.IsSet()) repair_mask.Clear();
while (Compare(nextBlockId, endBlockId) < 0)
{
NormBlock* block = block_buffer.Find(nextBlockId);
if (NULL != block) block->ClearRepairs();
Increment(nextBlockId);
if (!GetNextPending(nextBlockId)) break;
}
return true;
}
else
{
return false;
}
}
bool NormObject::ReceiverRewindCheck(NormBlockId blockId,
NormSegmentId segmentId)
{
if (Compare(blockId, current_block_id) > 0)
return false;
else if (blockId == current_block_id)
return ((segmentId+1) < next_segment_id);
else return true;
}
bool NormObject::IsRepairPending(bool flush)
{
if (pending_info && !repair_info) return true;
repair_mask.XCopy(pending_mask);
NormBlockId nextId;
if (GetFirstRepair(nextId))
{
do
{
if (!flush && (Compare(nextId, current_block_id) > 0)) break;
NormBlock* block = block_buffer.Find(nextId);
if (block)
{
bool isPending;
UINT16 numData = GetBlockSize(nextId);
if (flush || (Compare(nextId, current_block_id) < 0))
{
isPending = block->IsRepairPending(numData, nparity);
}
else
{
if (next_segment_id < numData)
isPending = block->IsRepairPending(next_segment_id, 0);
else
isPending = block->IsRepairPending(numData, nparity);
}
if (isPending) return true;
}
else
{
return true; }
Increment(nextId);
} while (GetNextRepair(nextId));
}
return false;
}
bool NormObject::AppendRepairRequest(NormNackMsg& nack,
bool flush,
UINT16 payloadMax)
{
NormRepairRequest req;
bool requestAppended = false; NormRepairRequest::Form prevForm = NormRepairRequest::INVALID;
NormBlockId nextId;
bool iterating = GetFirstPending(nextId);
NormBlockId prevId = nextId;
iterating = iterating && (flush || (Compare(nextId, max_pending_block) <= 0));
UINT32 consecutiveCount = 0;
while (iterating || (0 != consecutiveCount))
{
NormBlockId lastId;
GetLastPending(lastId); if (PL_TRACE <= GetDebugLevel())
{
PLOG(PL_TRACE, "NormObject::AppendRepairRequest() node>%lu obj>%hu, blk>%lu->%lu ",
(unsigned long)LocalNodeId(), (UINT16)GetId(),
(unsigned long)nextId.GetValue(), (unsigned long)lastId.GetValue());
PLOG(PL_ALWAYS, "(maxPending = %lu)\n", (unsigned long)max_pending_block.GetValue());
}
bool appendRequest = false;
NormBlock* block = iterating ? block_buffer.Find(nextId) : NULL;
if (NULL != block)
appendRequest = true;
else if (iterating && ((UINT32)Difference(nextId, prevId) == consecutiveCount))
consecutiveCount++;
else
appendRequest = true;
if (appendRequest)
{
NormRepairRequest::Form nextForm;
switch(consecutiveCount)
{
case 0:
nextForm = NormRepairRequest::INVALID;
break;
case 1:
case 2:
nextForm = NormRepairRequest::ITEMS;
break;
default:
nextForm = NormRepairRequest::RANGES;
break;
} if (prevForm != nextForm)
{
if ((NormRepairRequest::INVALID != prevForm) &&
(NACK_NONE != nacking_mode))
{
if (0 == nack.PackRepairRequest(req))
{
PLOG(PL_WARN, "NormObject::AppendRepairRequest() warning: full NACK msg\n");
return requestAppended;
}
requestAppended = true;
}
if (NormRepairRequest::INVALID != nextForm)
{
nack.AttachRepairRequest(req, payloadMax);
req.SetForm(nextForm);
req.ResetFlags();
if (NACK_NORMAL == nacking_mode)
req.SetFlag(NormRepairRequest::BLOCK);
if (pending_info)
req.SetFlag(NormRepairRequest::INFO);
}
prevForm = nextForm;
}
if (NormRepairRequest::INVALID != nextForm)
PLOG(PL_TRACE, "NormObject::AppendRepairRequest() BLOCK request\n");
switch (nextForm)
{
case NormRepairRequest::ITEMS:
req.AppendRepairItem(fec_id, fec_m, transport_id, prevId, GetBlockSize(prevId), 0); if (2 == consecutiveCount)
{
Increment(prevId);
req.AppendRepairItem(fec_id, fec_m, transport_id, prevId, GetBlockSize(prevId), 0); }
break;
case NormRepairRequest::RANGES:
{
NormBlockId endId = prevId;
Increment(endId, consecutiveCount - 1);
req.AppendRepairRange(fec_id, fec_m,
transport_id, prevId, GetBlockSize(prevId), 0,
transport_id, endId, GetBlockSize(endId), 0); break;
}
default:
break;
} if (NULL != block)
{
bool blockIsPending = false;
if (nextId == max_pending_block)
{
ASSERT(block->IsPending());
NormSymbolId firstPending;
block->GetFirstPending(firstPending);
if (firstPending < max_pending_segment) blockIsPending = true;
}
else
{
blockIsPending = true;
}
if (blockIsPending && (NACK_NONE != nacking_mode))
{
UINT16 numData = GetBlockSize(nextId);
if (NormRepairRequest::INVALID != prevForm)
{
if (0 == nack.PackRepairRequest(req))
{
PLOG(PL_WARN, "NormObject::AppendRepairRequest() warning: full NACK msg\n");
return requestAppended;
}
requestAppended = true;
}
bool blockRequestAppended;
if (flush || (nextId != max_pending_block))
{
blockRequestAppended =
block->AppendRepairRequest(nack, fec_id, fec_m, numData, nparity,
transport_id, pending_info, payloadMax);
}
else
{
if (max_pending_segment < numData)
{
blockRequestAppended =
block->AppendRepairRequest(nack, fec_id, fec_m, max_pending_segment, 0,
transport_id, pending_info, payloadMax);
}
else
{
blockRequestAppended =
block->AppendRepairRequest(nack, fec_id, fec_m, numData, nparity,
transport_id, pending_info, payloadMax);
}
}
if (blockRequestAppended)
requestAppended = true;
else return requestAppended;
prevForm = NormRepairRequest::INVALID;
}
consecutiveCount = 0;
}
else if (iterating)
{
consecutiveCount = 1;
}
else
{
consecutiveCount = 0; }
prevId = nextId;
} Increment(nextId);
iterating = GetNextPending(nextId);
iterating = iterating && (flush || (Compare(nextId, max_pending_block) <= 0));
}
if ((NormRepairRequest::INVALID != prevForm) && (NACK_NONE != nacking_mode))
{
if (0 == nack.PackRepairRequest(req))
{
PLOG(PL_WARN, "NormObject::AppendRepairRequest() warning: full NACK msg\n");
return requestAppended;
}
requestAppended = true;
prevForm = NormRepairRequest::INVALID;
}
if (!requestAppended && pending_info && (NACK_NONE != nacking_mode))
{
nack.AttachRepairRequest(req, payloadMax);
req.SetForm(NormRepairRequest::ITEMS);
req.ResetFlags();
req.SetFlag(NormRepairRequest::INFO);
req.AppendRepairItem(fec_id, fec_m, transport_id, 0, 0, 0); if (0 == nack.PackRepairRequest(req))
{
PLOG(PL_WARN, "NormObject::AppendRepairRequest() warning: full NACK msg\n");
return requestAppended;
}
requestAppended = true;
}
return requestAppended;
}
void NormObject::HandleObjectMessage(const NormObjectMsg& msg,
NormMsg::Type msgType,
NormBlockId blockId,
NormSegmentId segmentId)
{
if (NormMsg::INFO == msgType)
{
if (pending_info)
{
const NormInfoMsg& infoMsg = (const NormInfoMsg&)msg;
info_len = infoMsg.GetInfoLen();
if (info_len > segment_size)
{
info_len = segment_size;
PLOG(PL_WARN, "NormObject::HandleObjectMessage() node>%lu sender>%lu obj>%hu "
"Warning! info too long.\n", (unsigned long)LocalNodeId(),
(unsigned long)sender->GetId(), (UINT16)transport_id);
}
else if (0 == info_len)
{
if (NULL != info_ptr)
{
delete[] info_ptr;
info_ptr = NULL;
}
pending_info = false;
}
else
{
memcpy(info_ptr, infoMsg.GetInfo(), info_len);
pending_info = false;
session.Notify(NormController::RX_OBJECT_INFO, sender, this);
}
}
else
{
PLOG(PL_DEBUG, "NormObject::HandleObjectMessage() node>%lu sender>%lu obj>%hu "
"received duplicate info ...\n",
(unsigned long)LocalNodeId(), (unsigned long)sender->GetId(),
(UINT16)transport_id);
}
}
else {
const NormDataMsg& data = (const NormDataMsg&)msg;
UINT16 numData = GetBlockSize(blockId);
NormStreamObject* stream = NULL;
if (STREAM == type)
{
stream = static_cast<NormStreamObject*>(this);
if (!stream->StreamUpdateStatus(blockId))
{
PLOG(PL_WARN, "NormObject::HandleObjectMessage() node:%lu sender:%lu obj>%hu blk>%lu "
"broken stream ...\n", (unsigned long)LocalNodeId(), (unsigned long)sender->GetId(),
(UINT16)transport_id, (unsigned long)blockId.GetValue());
sender->IncrementResyncCount();
while (!stream->StreamUpdateStatus(blockId))
{
NormBlockId firstId;
if (GetFirstPending(firstId))
{
NormBlock* block = block_buffer.Find(firstId);
if (block)
{
block_buffer.Remove(block);
sender->PutFreeBlock(block);
}
pending_mask.Unset(firstId.GetValue());
}
else
{
stream->StreamResync(blockId); break;
}
}
}
}
if (pending_mask.Test(blockId.GetValue()))
{
NormBlock* block = block_buffer.Find(blockId);
if (!block)
{
if (!(block = sender->GetFreeBlock(transport_id, blockId)))
{
return;
}
block->RxInit(blockId, numData, nparity);
block_buffer.Insert(block);
}
if (block->IsPending(segmentId))
{
UINT16 segmentLength = data.GetPayloadDataLength();
if (segmentLength > segment_size)
{
PLOG(PL_ERROR, "NormObject::HandleObjectMessage() node>%lu sender>%lu obj>%hu "
"Error! segment too large ...\n", (unsigned long)LocalNodeId(),
(unsigned long)sender->GetId(),(UINT16)transport_id);
return;
}
UINT16 payloadLength = data.GetPayloadLength();
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
payloadLength = MIN(payloadLength, SIM_PAYLOAD_MAX);
#endif
bool isSourceSymbol = (segmentId < numData);
char* segment = (!isSourceSymbol || !sender->SegmentPoolIsEmpty()) ?
sender->GetFreeSegment(transport_id, blockId) : NULL;
if (segment)
{
memcpy(segment, data.GetPayload(), payloadLength);
if (payloadLength < payloadMax)
memset(segment+payloadLength, 0, payloadMax-payloadLength);
block->AttachSegment(segmentId, segment);
}
else
{
if (!isSourceSymbol) return;
}
block->UnsetPending(segmentId);
bool objectUpdated = false;
if (isSourceSymbol)
{
block->DecrementErasureCount();
if (WriteSegment(blockId, segmentId, data.GetPayload()))
{
objectUpdated = true;
sender->IncrementRecvGoodput(segmentLength);
}
else
{
if (IsStream())
PLOG(PL_DEBUG, "NormObject::HandleObjectMessage() WriteSegment() error\n");
else
PLOG(PL_ERROR, "NormObject::HandleObjectMessage() WriteSegment() error\n");
}
}
else
{
block->IncrementParityCount();
}
if (block->ErasureCount() <= block->ParityCount())
{
PLOG(PL_DETAIL, "NormObject::HandleObjectMessage() node>%lu sender>%lu obj>%hu blk>%lu "
"completed block ...\n", (unsigned long)LocalNodeId(),
(unsigned long)sender->GetId(), (UINT16)transport_id,
(unsigned long)block->GetId().GetValue());
UINT16 erasureCount = 0;
UINT16 nextErasure = 0;
UINT16 retrievalCount = 0;
if (block->GetFirstPending(nextErasure))
{
if (nextErasure < numData)
{
for (UINT16 nextSegment = 0; nextSegment < numData; nextSegment++)
{
if (block->IsPending(nextSegment))
{
sender->SetErasureLoc(erasureCount++, nextSegment);
segment = sender->GetRetrievalSegment();
ASSERT(NULL != segment);
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
#endif memset(segment, 0, payloadMax);
sender->SetRetrievalLoc(retrievalCount++, nextSegment);
block->SetSegment(nextSegment, segment);
}
else if (!block->GetSegment(nextSegment))
{
if (!(segment = RetrieveSegment(blockId, nextSegment)))
{
ASSERT(IsStream());
block->SetPending(nextSegment);
block->IncrementErasureCount();
for (UINT16 i = 0; i < retrievalCount; i++)
block->DetachSegment(sender->GetRetrievalLoc(i));
return;
}
sender->SetRetrievalLoc(retrievalCount++, nextSegment);
block->SetSegment(nextSegment, segment);
}
}
nextErasure = numData;
while (block->GetNextPending(nextErasure))
sender->SetErasureLoc(erasureCount++, nextErasure++);
} }
if (erasureCount)
{
sender->Decode(block->SegmentList(), numData, erasureCount);
for (UINT16 i = 0; i < erasureCount; i++)
{
NormSegmentId sid = sender->GetErasureLoc(i);
if (sid < numData)
{
if (WriteSegment(blockId, sid, block->GetSegment(sid)))
{
objectUpdated = true;
sender->IncrementRecvGoodput(segmentLength);
}
else
{
if (IsStream())
PLOG(PL_DEBUG, "NormObject::HandleObjectMessage() WriteSegment() error\n");
else
PLOG(PL_ERROR, "NormObject::HandleObjectMessage() WriteSegment() error\n");
}
}
else
{
break;
}
}
}
for (UINT16 i = 0; i < retrievalCount; i++)
block->DetachSegment(sender->GetRetrievalLoc(i));
pending_mask.Unset(blockId.GetValue());
block_buffer.Remove(block);
sender->PutFreeBlock(block);
} if (objectUpdated && notify_on_update)
{
if ((NULL == stream) || stream->DetermineReadReadiness() || session.RcvrIsLowDelay())
{
notify_on_update = false;
session.Notify(NormController::RX_OBJECT_UPDATED, sender, this);
}
}
}
else
{
PLOG(PL_DEBUG, "NormObject::HandleObjectMessage() node>%lu sender>%lu obj>%hu "
"received duplicate segment blk>%lu segment>%hu...\n", (unsigned long)LocalNodeId(),
(unsigned long)sender->GetId(), (UINT16)transport_id,(unsigned long)blockId.GetValue(),
segmentId);
}
}
else
{
PLOG(PL_DEBUG, "NormObject::HandleObjectMessage() node>%lu sender>%lu obj>%hu "
"received duplicate block message blk>%lu ...\n", (unsigned long)LocalNodeId(),
(unsigned long)sender->GetId(), (UINT16)transport_id, (unsigned long)blockId.GetValue());
} }
}
bool NormObject::ReclaimSourceSegments(NormSegmentPool& segmentPool)
{
NormBlockBuffer::Iterator iterator(block_buffer);
NormBlock* block;
while ((block = iterator.GetNextBlock()))
{
bool reclaimed = false;
UINT16 numData = GetBlockSize(block->GetId());
for (UINT16 i = 0; i < numData; i++)
{
char* s = block->DetachSegment(i);
if (s)
{
segmentPool.Put(s);
reclaimed = true;
}
}
if (reclaimed) return true;
}
return false;
}
NormBlock* NormObject::StealNonPendingBlock(bool excludeBlock, NormBlockId excludeId)
{
if (block_buffer.IsEmpty())
{
return NULL;
}
else
{
NormBlockBuffer::Iterator iterator(block_buffer);
NormBlock* block;
while ((block = iterator.GetNextBlock()))
{
NormBlockId bid = block->GetId();
if (block->IsTransmitPending() ||
pending_mask.Test(bid.GetValue()) ||
repair_mask.Test(bid.GetValue()) ||
(excludeBlock && (excludeId == bid)))
{
continue;
}
else
{
block_buffer.Remove(block);
return block;
}
}
}
return NULL;
}
NormBlock* NormObject::StealNewestBlock(bool excludeBlock, NormBlockId excludeId)
{
if (block_buffer.IsEmpty())
{
return NULL;
}
else
{
NormBlock* block = block_buffer.Find(block_buffer.RangeHi());
if (excludeBlock && (excludeId == block->GetId()))
{
return NULL;
}
else
{
block_buffer.Remove(block);
return block;
}
}
}
NormBlock* NormObject::StealOldestBlock(bool excludeBlock, NormBlockId excludeId)
{
if (block_buffer.IsEmpty())
{
return NULL;
}
else
{
NormBlock* block = block_buffer.Find(block_buffer.RangeLo());
if (excludeBlock && (excludeId == block->GetId()))
{
return NULL;
}
else
{
block_buffer.Remove(block);
return block;
}
}
}
bool NormObject::NextSenderMsg(NormObjectMsg* msg)
{
if (pending_info)
{
NormInfoMsg* infoMsg = static_cast<NormInfoMsg*>(msg);
infoMsg->Init();
infoMsg->SetFecId(fec_id);
}
else
{
NormDataMsg* dataMsg = static_cast<NormDataMsg*>(msg);
dataMsg->Init();
dataMsg->SetFecId(fec_id); }
msg->ResetFlags();
switch(type)
{
case STREAM:
msg->SetFlag(NormObjectMsg::FLAG_STREAM);
break;
case FILE:
msg->SetFlag(NormObjectMsg::FLAG_FILE);
break;
default:
break;
}
NormSession::FtiMode ftiMode = session.SenderFtiMode();
if ((NULL != info_ptr) || (NormSession::FTI_INFO == ftiMode))
msg->SetFlag(NormObjectMsg::FLAG_INFO);
msg->SetObjectId(transport_id);
if ((NormSession::FTI_ALWAYS == ftiMode) ||
(pending_info && (NormSession::FTI_INFO == ftiMode)))
{
switch (fec_id)
{
case 2:
{
NormFtiExtension2 fti;
msg->AttachExtension(fti);
fti.SetObjectSize(object_size);
fti.SetFecFieldSize(fec_m);
fti.SetFecGroupSize(1);
fti.SetSegmentSize(segment_size);
fti.SetFecMaxBlockLen(ndata);
fti.SetFecNumParity(nparity);
break;
}
case 5:
{
NormFtiExtension5 fti;
msg->AttachExtension(fti);
fti.SetObjectSize(object_size);
fti.SetSegmentSize(segment_size);
fti.SetFecMaxBlockLen((UINT8)ndata);
fti.SetFecNumParity((UINT8)nparity);
break;
}
case 129:
{
NormFtiExtension129 fti;
msg->AttachExtension(fti);
fti.SetObjectSize(object_size);
fti.SetFecInstanceId(0); fti.SetSegmentSize(segment_size);
fti.SetFecMaxBlockLen(ndata);
fti.SetFecNumParity(nparity);
break;
}
default:
ASSERT(0);
return false;
}
}
if (pending_info)
{
NormInfoMsg* infoMsg = static_cast<NormInfoMsg*>(msg);
infoMsg->SetInfo(info_ptr, info_len);
pending_info = false;
return true;
}
NormDataMsg* data = static_cast<NormDataMsg*>(msg);
NormBlock* block = NULL;
NormBlockId blockId;
UINT16 numData = 0;
NormSegmentId segmentId;
bool squelchQueued = false;
while (NULL == block)
{
if (!GetFirstPending(blockId))
{
if (IsStream())
{
if (static_cast<NormStreamObject*>(this)->StreamAdvance())
{
continue; }
else
{
return false;
}
}
else
{
PLOG(PL_FATAL, "NormObject::NextSenderMsg() pending object w/ no pending blocks?!\n");
return false;
}
}
numData = GetBlockSize(blockId);
block = block_buffer.Find(blockId);
if (!block)
{
if (NULL == (block = session.SenderGetFreeBlock(transport_id, blockId)))
{
PLOG(PL_INFO, "NormObject::NextSenderMsg() node>%lu warning: sender resource "
"constrained (no free blocks).\n", (unsigned long)LocalNodeId());
return false;
}
UINT16 totalBlockLen = numData + nparity;
for (UINT16 i = numData; i < totalBlockLen; i++)
{
char* s = session.SenderGetFreeSegment(transport_id, blockId);
if (s)
{
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
#endif memset(s, 0, payloadMax); block->AttachSegment(i, s);
}
else
{
PLOG(PL_INFO, "NormObject::NextSenderMsg() node>%lu warning: sender resource "
"constrained (no free segments).\n", (unsigned long)LocalNodeId());
session.SenderPutFreeBlock(block);
return false;
}
}
block->TxInit(blockId, numData, session.SenderAutoParity());
if (Compare(blockId, max_pending_block) < 0)
block->SetFlag(NormBlock::IN_REPAIR);
while (!block_buffer.Insert(block))
{
if (Compare(blockId, block_buffer.RangeLo()) > 0)
{
NormBlock* lowBlock = block_buffer.Find(block_buffer.RangeLo());
NormBlockId lowBlockId = lowBlock->GetId();
bool push = static_cast<NormStreamObject*>(this)->GetPushMode();
if (!push && (lowBlock->IsRepairPending() || IsRepairSet(lowBlockId)))
{
PLOG(PL_DEBUG, "NormObject::NextSenderMsg() node>%lu pending repairs delaying stream progress\n",
(unsigned long)LocalNodeId());
session.SenderPutFreeBlock(block);
return false;
}
else
{
block_buffer.Remove(lowBlock);
repair_mask.Unset(lowBlockId.GetValue()); pending_mask.Unset(lowBlockId.GetValue());
if (IsStream()) static_cast<NormStreamObject*>(this)->UnlockBlock(lowBlockId);
session.SenderPutFreeBlock(lowBlock);
continue;
}
}
else if (IsStream())
{
PLOG(PL_WARN, "NormObject::NextSenderMsg() node>%lu Warning! can't repair old stream block\n",
(unsigned long)LocalNodeId());
if (!squelchQueued)
{
session.SenderQueueSquelch(transport_id);
squelchQueued = true;
}
session.SenderPutFreeBlock(block);
repair_mask.Unset(blockId.GetValue()); pending_mask.Unset(blockId.GetValue());
static_cast<NormStreamObject*>(this)->UnlockBlock(blockId);
block = NULL;
break; }
else
{
PLOG(PL_FATAL, "NormObject::NextSenderMsg() invalid non-stream state!\n");
ASSERT(0);
return false;
}
} if (NULL == block) continue;
} if (!block->GetFirstPending(segmentId))
{
PLOG(PL_ERROR, "NormObject::NextSenderMsg() warning: found pending block %lu with nothing pending!?\n",
(unsigned long)blockId.GetValue());
pending_mask.Unset(blockId.GetValue());
block = NULL;
continue; }
if (segmentId < numData)
{
char* buffer = data->AccessPayload();
UINT16 payloadLength = ReadSegment(blockId, segmentId, buffer);
if (0 == payloadLength)
{
if (!IsStream())
{
PLOG(PL_FATAL, "NormObject::NextSenderMsg() ReadSegment() error\n");
return false;
}
else if (static_cast<NormStreamObject*>(this)->IsOldBlock(blockId))
{
PLOG(PL_ERROR, "NormObject::NextSenderMsg() node>%lu Warning! can't repair old stream segment\n",
(unsigned long)LocalNodeId());
block->UnsetPending(segmentId);
if (!block->IsPending())
{
block->ResetParityCount(nparity);
pending_mask.Unset(blockId.GetValue());
if (session.SndrEmcon() && (HaveInfo() || (NormSession::FTI_INFO == session.SenderFtiMode())))
pending_info = true;
}
block = NULL;
continue; }
else
{
return false;
}
}
data->SetPayloadLength(payloadLength);
if ((block->ParityReadiness() == segmentId) && (0 != nparity))
{
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
#endif if (payloadLength < payloadMax)
memset(buffer+payloadLength, 0, payloadMax-payloadLength);
block->UpdateSegSizeMax(payloadLength);
session.SenderEncode(segmentId, data->AccessPayload(), block->SegmentList(numData));
block->IncreaseParityReadiness();
}
}
else
{
if (!block->ParityReady(numData))
{
ASSERT(0 == block->ParityReadiness());
CalculateBlockParity(block);
}
char* segment = block->GetSegment(segmentId);
ASSERT(NULL != segment);
#ifdef SIMULATE
UINT16 payloadMax = MIN(block->GetSegSizeMax(), SIM_PAYLOAD_MAX);
data->SetPayload(segment, payloadMax);
data->SetPayloadLength(block->GetSegSizeMax()); #else
data->SetPayload(segment, block->GetSegSizeMax());
#endif }
} block->UnsetPending(segmentId);
data->SetFecPayloadId(fec_id, blockId.GetValue(), segmentId, numData, fec_m);
if (!block->IsPending())
{
block->ResetParityCount(nparity);
pending_mask.Unset(blockId.GetValue());
if (session.SndrEmcon() && (HaveInfo() || (NormSession::FTI_INFO == session.SenderFtiMode())))
pending_info = true;
if (blockId == max_pending_block)
Increment(max_pending_block);
}
if (session.GetFlowControl() > 0.0)
{
ProtoTime currentTime;
currentTime.GetCurrentTime();
SetLastNackTime(currentTime);
if (IsStream())
static_cast<NormStreamObject*>(this)->SetLastNackTime(blockId, currentTime);
}
if (!pending_mask.IsSet())
{
if (IsStream())
{
static_cast<NormStreamObject*>(this)->StreamAdvance();
}
else if (first_pass)
{
first_pass = false;
session.Notify(NormController::TX_OBJECT_SENT, NULL, this);
}
}
return true;
}
bool NormStreamObject::StreamAdvance()
{
NormBlockId nextBlockId = stream_next_id;
if (repair_mask.CanSet(nextBlockId.GetValue()))
{
if (block_buffer.CanInsert(nextBlockId.GetValue()))
{
if (pending_mask.Set(nextBlockId.GetValue()))
{
Increment(stream_next_id);
return true;
}
else
{
PLOG(PL_ERROR, "NormStreamObject::StreamAdvance() error: node>%lu couldn't set set stream pending mask (1)\n",
(unsigned long)LocalNodeId());
}
}
else
{
NormBlock* block = block_buffer.Find(block_buffer.RangeLo());
ASSERT(NULL != block);
if (!block->IsTransmitPending())
{
if (pending_mask.Set(nextBlockId.GetValue()))
{
Increment(stream_next_id);
return true;
}
else
{
PLOG(PL_ERROR, "NormStreamObject::StreamAdvance() error: node>%lu couldn't set stream pending mask (2)\n",
(unsigned long)LocalNodeId());
}
}
else
{
PLOG(PL_DEBUG, "NormStreamObject::StreamAdvance() warning: node>%lu pending segment repairs (blk>%lu) "
"delaying stream advance ...\n", (unsigned long)LocalNodeId(),
(unsigned long)block->GetId().GetValue());
}
}
}
else
{
PLOG(PL_WARN, "NormStreamObject::StreamAdvance() warning: node>%lu pending block repair delaying stream advance ...\n",
(unsigned long)LocalNodeId());
}
return false;
}
NormBlockId NormStreamObject::RepairWindowLo() const
{
NormBlockId blockId(0);
if (!stream_buffer.IsEmpty())
blockId = StreamBufferLo();
if (!block_buffer.IsEmpty())
{
NormBlockId rangeMin = block_buffer.RangeMin();
if (Compare(rangeMin, blockId) > 0)
blockId = rangeMin;
}
return blockId;
}
bool NormObject::CalculateBlockParity(NormBlock* block)
{
if (0 == nparity) return true;
char buffer[NormMsg::MAX_SIZE];
UINT16 numData = GetBlockSize(block->GetId());
for (UINT16 i = 0; i < numData; i++)
{
UINT16 payloadLength = ReadSegment(block->GetId(), i, buffer);
if (0 != payloadLength)
{
UINT16 payloadMax = segment_size+NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
#endif if (payloadLength < payloadMax)
memset(buffer+payloadLength, 0, payloadMax-payloadLength+1);
block->UpdateSegSizeMax(payloadLength);
session.SenderEncode(i, buffer, block->SegmentList(numData));
}
else
{
return false;
}
}
block->SetParityReadiness(numData);
return true;
}
NormBlock* NormObject::SenderRecoverBlock(NormBlockId blockId)
{
NormBlock* block = session.SenderGetFreeBlock(transport_id, blockId);
if (block)
{
UINT16 numData = GetBlockSize(blockId);
block->TxRecover(blockId, numData, nparity);
UINT16 totalBlockLen = numData + nparity;
for (UINT16 i = numData; i < totalBlockLen; i++)
{
char* s = session.SenderGetFreeSegment(transport_id, blockId);
if (s)
{
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
#endif memset(s, 0, payloadMax); block->AttachSegment(i, s);
}
else
{
session.SenderPutFreeBlock(block);
return (NormBlock*)NULL;
}
}
if (CalculateBlockParity(block))
{
if (!block_buffer.Insert(block))
{
session.SenderPutFreeBlock(block);
PLOG(PL_DEBUG, "NormObject::SenderRecoverBlock() node>%lu couldn't buffer recovered block\n",
(unsigned long)LocalNodeId());
return NULL;
}
return block;
}
else
{
session.SenderPutFreeBlock(block);
return (NormBlock*)NULL;
}
}
else
{
return (NormBlock*)NULL;
}
}
NormFileObject::NormFileObject(class NormSession& theSession,
class NormSenderNode* theSender,
const NormObjectId& objectId)
: NormObject(FILE, theSession, theSender, objectId),
large_block_length(0), small_block_length(0)
{
path[0] = '\0';
}
NormFileObject::~NormFileObject()
{
Close();
}
bool NormFileObject::Open(const char* thePath,
const char* infoPtr,
UINT16 infoLen)
{
if (sender)
{
if (NormFile::IsLocked(thePath))
{
PLOG(PL_FATAL, "NormFileObject::Open() Error trying to open locked file for recv!\n");
return false;
}
else
{
if (file.Open(thePath, O_RDWR | O_CREAT | O_TRUNC))
{
if (!file.Lock())
PLOG(PL_WARN, "NormFileObject::Open() warning: NormFile::Lock() failure\n");
}
else
{
PLOG(PL_FATAL, "NormFileObject::Open() recv file.Open() error!\n");
return false;
}
}
}
else
{
if (NormFile::NORMAL != NormFile::GetType(thePath))
{
PLOG(PL_FATAL, "NormFileObject::Open() send file \"%s\" is not a file "
"(a directory perhaps?)\n", thePath);
return false;
}
if (file.Open(thePath, O_RDONLY))
{
NormObjectSize::Offset size = file.GetSize();
{
if (!NormObject::Open(NormObjectSize(size),
infoPtr,
infoLen,
session.SenderSegmentSize(),
session.GetSenderFecId(),
session.GetSenderFecFieldSize(),
session.SenderBlockSize(),
session.SenderNumParity()))
{
PLOG(PL_FATAL, "NormFileObject::Open() send object open error\n");
Close();
return false;
}
}
}
else
{
PLOG(PL_FATAL, "NormFileObject::Open() send file.Open() error!\n");
return false;
}
}
large_block_length = NormObjectSize(large_block_size) * segment_size;
small_block_length = NormObjectSize(small_block_size) * segment_size;
strncpy(path, thePath, PATH_MAX);
size_t len = strlen(thePath);
len = MIN(len, PATH_MAX);
if (len < PATH_MAX) path[len] = '\0';
return true;
}
bool NormFileObject::Accept(const char* thePath)
{
if (Open(thePath))
{
NormObject::Accept();
return true;
}
else
{
return false;
}
}
void NormFileObject::CloseFile()
{
if (file.IsOpen())
{
if (NULL != sender) file.Unlock();
file.Close();
}
}
void NormFileObject::Close()
{
CloseFile();
NormObject::Close();
}
bool NormFileObject::WriteSegment(NormBlockId blockId,
NormSegmentId segmentId,
const char* buffer)
{
size_t len;
if (blockId == final_block_id)
{
if (segmentId == (GetBlockSize(blockId)-1))
len = final_segment_size;
else
len = segment_size;
}
else
{
len = segment_size;
}
NormObjectSize segmentOffset;
NormObjectSize segmentSize = NormObjectSize(segment_size);
if (blockId.GetValue() < large_block_count)
{
segmentOffset = (large_block_length*blockId.GetValue()) + (segmentSize*segmentId);
}
else
{
segmentOffset = large_block_length*large_block_count; UINT32 smallBlockIndex = blockId.GetValue() - large_block_count;
segmentOffset = segmentOffset + small_block_length*smallBlockIndex +
segmentSize*segmentId;
}
NormFile::Offset offset = segmentOffset.GetOffset();
if (offset != file.GetOffset())
{
if (!file.Seek(offset)) return false;
}
size_t nbytes = file.Write(buffer, len);
return (nbytes == len);
}
UINT16 NormFileObject::ReadSegment(NormBlockId blockId,
NormSegmentId segmentId,
char* buffer)
{
size_t len;
if (blockId == final_block_id)
{
if (segmentId == (GetBlockSize(blockId) - 1))
len = final_segment_size;
else
len = segment_size;
}
else
{
len = segment_size;
}
NormObjectSize segmentOffset;
NormObjectSize segmentSize = NormObjectSize(segment_size);
if (blockId.GetValue() < large_block_count)
{
segmentOffset = large_block_length*blockId.GetValue() + segmentSize*segmentId;
}
else
{
segmentOffset = large_block_length*large_block_count; UINT32 smallBlockIndex = blockId.GetValue() - large_block_count;
segmentOffset = segmentOffset + small_block_length*smallBlockIndex +
segmentSize*segmentId;
}
NormFile::Offset offset = segmentOffset.GetOffset();
if (offset != file.GetOffset())
{
if (!file.Seek(offset))
{
PLOG(PL_FATAL, "NormFileObject::ReadSegment() error seeking to file offset\n");
return 0;
}
}
size_t nbytes = file.Read(buffer, len);
if (len == nbytes)
return (UINT16)len;
else
return 0;
}
char* NormFileObject::RetrieveSegment(NormBlockId blockId,
NormSegmentId segmentId)
{
if (sender)
{
char* segment = sender->GetRetrievalSegment();
UINT16 len = ReadSegment(blockId, segmentId, segment);
if (len)
{
if (len < segment_size)
memset(segment+len, 0, segment_size-len);
return segment;
}
else
{
PLOG(PL_FATAL, "NormFileObject::RetrieveSegment() error reading segment\n");
return NULL;
}
}
else
{
PLOG(PL_FATAL, "NormFileObject::RetrieveSegment() error: NULL sender!\n");
return NULL;
}
}
NormDataObject::NormDataObject(class NormSession& theSession,
class NormSenderNode* theSender,
const NormObjectId& objectId,
DataFreeFunctionHandle dataFreeFunc)
: NormObject(DATA, theSession, theSender, objectId),
large_block_length(0), small_block_length(0),
data_ptr(NULL), data_max(0), data_released(false),
data_free_func(dataFreeFunc)
{
}
NormDataObject::~NormDataObject()
{
Close();
if (data_released)
{
if (NULL != data_ptr)
{
if (NULL != data_free_func)
data_free_func(data_ptr);
else
delete[] data_ptr;
data_ptr = NULL;
}
data_released = false;
}
}
bool NormDataObject::Open(char* dataPtr,
UINT32 dataLen,
bool dataRelease,
const char* infoPtr,
UINT16 infoLen)
{
if (data_released && (NULL != data_ptr))
{
if (NULL != data_free_func)
data_free_func(data_ptr);
else
delete[] data_ptr;
data_ptr = NULL;
data_released = false;
}
if (NULL == sender)
{
if (!NormObject::Open(dataLen,
infoPtr,
infoLen,
session.SenderSegmentSize(),
session.GetSenderFecId(),
session.GetSenderFecFieldSize(),
session.SenderBlockSize(),
session.SenderNumParity()))
{
PLOG(PL_FATAL, "NormDataObject::Open() send object open error\n");
Close();
return false;
}
}
else
{
}
data_ptr = dataPtr;
data_max = dataLen;
data_released = dataRelease;
large_block_length = NormObjectSize(large_block_size) * segment_size;
small_block_length = NormObjectSize(small_block_size) * segment_size;
return true;
}
bool NormDataObject::Accept(char* dataPtr, UINT32 dataMax, bool dataRelease)
{
if (Open(dataPtr, dataMax, dataRelease))
{
NormObject::Accept();
return true;
}
else
{
return false;
}
}
void NormDataObject::Close()
{
NormObject::Close();
}
bool NormDataObject::WriteSegment(NormBlockId blockId,
NormSegmentId segmentId,
const char* buffer)
{
if (NULL == data_ptr)
{
PLOG(PL_FATAL, "NormDataObject::WriteSegment() error: NULL data_ptr\n");
return false;
}
UINT16 len;
if (blockId == final_block_id)
{
if (segmentId == (GetBlockSize(blockId)-1))
len = final_segment_size;
else
len = segment_size;
}
else
{
len = segment_size;
}
NormObjectSize segmentOffset;
NormObjectSize segmentSize = NormObjectSize(segment_size);
if (blockId.GetValue() < large_block_count)
{
segmentOffset = large_block_length*blockId.GetValue() + segmentSize*segmentId;
}
else
{
segmentOffset = large_block_length*large_block_count; UINT32 smallBlockIndex = blockId.GetValue() - large_block_count;
segmentOffset = segmentOffset + small_block_length*smallBlockIndex +
segmentSize*segmentId;
}
ASSERT(0 == segmentOffset.MSB()); if (data_max <= segmentOffset.LSB())
return true;
else if (data_max <= (segmentOffset.LSB() + len))
len -= (segmentOffset.LSB() + len - data_max);
memcpy(data_ptr + segmentOffset.LSB(), buffer, len);
return true;
}
UINT16 NormDataObject::ReadSegment(NormBlockId blockId,
NormSegmentId segmentId,
char* buffer)
{
if (NULL == data_ptr)
{
PLOG(PL_FATAL, "NormDataObject::ReadSegment() error: NULL data_ptr\n");
return 0;
}
UINT16 len;
if (blockId == final_block_id)
{
if (segmentId == (GetBlockSize(blockId)-1))
len = final_segment_size;
else
len = segment_size;
}
else
{
len = segment_size;
}
NormObjectSize segmentOffset;
NormObjectSize segmentSize = NormObjectSize(segment_size);
if (blockId.GetValue() < large_block_count)
{
segmentOffset = large_block_length*blockId.GetValue() + segmentSize*segmentId;
}
else
{
segmentOffset = large_block_length*large_block_count; UINT32 smallBlockIndex = blockId.GetValue() - large_block_count;
segmentOffset = segmentOffset + small_block_length*smallBlockIndex +
segmentSize*segmentId;
}
ASSERT(0 == segmentOffset.MSB()); if (data_max <= segmentOffset.LSB())
return 0;
else if (data_max <= (segmentOffset.LSB() + len))
len -= (segmentOffset.LSB() + len - data_max);
memcpy(buffer, data_ptr + segmentOffset.LSB(), len);
return len;
}
char* NormDataObject::RetrieveSegment(NormBlockId blockId,
NormSegmentId segmentId)
{
if (NULL == data_ptr)
{
PLOG(PL_FATAL, "NormDataObject::RetrieveSegment() error: NULL data_ptr\n");
return NULL;
}
UINT16 len;
if (blockId == final_block_id)
{
if (segmentId == (GetBlockSize(blockId)-1))
len = final_segment_size;
else
len = segment_size;
}
else
{
len = segment_size;
}
NormObjectSize segmentOffset;
NormObjectSize segmentSize = NormObjectSize(segment_size);
if (blockId.GetValue() < large_block_count)
{
segmentOffset = large_block_length*blockId.GetValue() + segmentSize*segmentId;
}
else
{
segmentOffset = large_block_length*large_block_count; UINT32 smallBlockIndex = blockId.GetValue() - large_block_count;
segmentOffset = segmentOffset + small_block_length*smallBlockIndex +
segmentSize*segmentId;
}
ASSERT(0 == segmentOffset.MSB()); if ((len < segment_size) || (data_max < (segmentOffset.LSB() + len)))
{
if (sender)
{
char* segment = sender->GetRetrievalSegment();
len = ReadSegment(blockId, segmentId, segment);
memset(segment+len, 0, segment_size-len);
return segment;
}
else
{
PLOG(PL_FATAL, "NormDataObject::RetrieveSegment() error: NULL sender!\n");
return NULL;
}
}
else
{
return (data_ptr + segmentOffset.LSB());
}
}
NormStreamObject::NormStreamObject(class NormSession& theSession,
class NormSenderNode* theSender,
const NormObjectId& objectId)
: NormObject(STREAM, theSession, theSender, objectId),
stream_sync(false), write_vacancy(false),
read_init(true), read_ready(false),
flush_pending(false), msg_start(true),
flush_mode(FLUSH_NONE), push_mode(false),
stream_broken(false), stream_closing(false),
block_pool_threshold(0)
{
}
NormStreamObject::~NormStreamObject()
{
Close();
tx_offset = write_offset = read_offset = 0;
NormBlock* b;
while ((b = stream_buffer.Find(stream_buffer.RangeLo())))
{
stream_buffer.Remove(b);
b->EmptyToPool(segment_pool);
block_pool.Put(b);
}
stream_buffer.Destroy();
segment_pool.Destroy();
block_pool.Destroy();
}
NormBlockId NormStreamObject::FlushBlockId() const
{
if (0 != write_index.segment)
{
return write_index.block;
}
else
{
NormBlockId blockId = write_index.block;
Decrement(blockId);
return blockId;
}
}
bool NormStreamObject::Open(UINT32 bufferSize,
bool doubleBuffer,
const char* infoPtr,
UINT16 infoLen)
{
if (0 == bufferSize)
{
PLOG(PL_FATAL, "NormStreamObject::Open() zero bufferSize error\n");
return false;
}
UINT16 segmentSize, numData;
if (NULL != sender)
{
segmentSize = segment_size;
numData = ndata;
}
else
{
segmentSize = session.SenderSegmentSize();
numData = session.SenderBlockSize();
stream_next_id = pending_mask.GetSize();
}
UINT32 blockSize = segmentSize * numData;
UINT32 numBlocks = bufferSize / blockSize;
numBlocks = MAX(2, numBlocks);
bufferSize = numBlocks * blockSize;
if (NULL == sender)
{
if (!NormObject::Open(NormObjectSize((UINT32)bufferSize),
infoPtr,
infoLen,
session.SenderSegmentSize(),
session.GetSenderFecId(),
session.GetSenderFecFieldSize(),
session.SenderBlockSize(),
session.SenderNumParity()))
{
PLOG(PL_FATAL, "NormStreamObject::Open() object open error\n");
Close();
return false;
}
}
if (doubleBuffer) numBlocks *= 2;
UINT32 numSegments = numBlocks * numData;
if (!block_pool.Init(numBlocks, numData))
{
PLOG(PL_FATAL, "NormStreamObject::Open() block_pool init error\n");
Close();
return false;
}
if (!segment_pool.Init(numSegments, segmentSize+NormDataMsg::GetStreamPayloadHeaderLength()))
{
PLOG(PL_FATAL, "NormStreamObject::Open() segment_pool init error\n");
Close();
return false;
}
if (!stream_buffer.Init(numBlocks, 256, fec_block_mask))
{
PLOG(PL_FATAL, "NormStreamObject::Open() stream_buffer init error\n");
Close();
return false;
}
read_init = true;
read_index.block = read_index.segment = read_index.offset = 0;
write_index.block = write_index.segment = 0;
tx_index.block = tx_index.segment = 0;
tx_offset = write_offset = read_offset = 0;
write_vacancy = true;
stream_sync = false;
flush_pending = false;
msg_start = true;
stream_closing = false;
return true;
}
bool NormStreamObject::Accept(UINT32 bufferSize, bool doubleBuffer)
{
if (Accepted()) return true; if (Open(bufferSize, doubleBuffer))
{
NormObject::Accept();
return true;
}
else
{
return false;
}
}
void NormStreamObject::Close(bool graceful)
{
if (graceful && (NULL == sender))
{
Terminate();
}
else
{
NormObject::Close();
write_vacancy = false;
}
}
bool NormStreamObject::LockBlocks(NormBlockId firstId, NormBlockId lastId, const ProtoTime& currentTime)
{
NormBlockId nextId = firstId;
while (Compare(nextId, lastId) <= 0)
{
NormBlock* block = stream_buffer.Find(nextId);
if (NULL == block) return false;
Increment(nextId);
}
nextId = firstId;
while (Compare(nextId, lastId) <= 0)
{
NormBlock* block = stream_buffer.Find(nextId);
if (NULL != block)
{
UINT16 numData = GetBlockSize(nextId);
block->SetPending(0, numData); block->SetLastNackTime(currentTime);
}
Increment(nextId);
}
return true;
}
void NormStreamObject::UnlockBlock(NormBlockId blockId)
{
NormBlock* block = stream_buffer.Find(blockId);
if (NULL != block) block->ClearPending();
}
bool NormStreamObject::LockSegments(NormBlockId blockId, NormSegmentId firstId, NormSegmentId lastId)
{
NormBlock* block = stream_buffer.Find(blockId);
if (block)
{
ASSERT(firstId <= lastId);
block->SetPending(firstId, (lastId - firstId + 1));
return true;
}
else
{
return false;
}
}
bool NormStreamObject::StreamUpdateStatus(NormBlockId blockId)
{
if (stream_sync)
{
if (Compare(blockId, stream_sync_id) < 0)
{
return true;
}
else
{
if (Compare(blockId, stream_next_id) < 0)
{
return true;
}
else
{
if (pending_mask.IsSet())
{
if (pending_mask.CanSet(blockId.GetValue()))
{
UINT32 numBits = (UINT32)Difference(blockId, stream_next_id) + 1;
pending_mask.SetBits(stream_next_id.GetValue(), numBits);
stream_next_id = blockId;
Increment(stream_next_id);
UINT32 delta = (UINT32)Difference(stream_next_id, stream_sync_id);
if (delta > 2*pending_mask.GetSize())
GetFirstPending(stream_sync_id);
return true;
}
else
{
return false;
}
}
else
{
UINT32 delta = (UINT32)Difference(blockId, stream_next_id) + 1;
if (delta > pending_mask.GetSize())
{
return false;
}
else
{
pending_mask.SetBits(blockId.GetValue(), pending_mask.GetSize());
stream_next_id = blockId;
Increment(stream_next_id, pending_mask.GetSize());
UINT32 delta = (UINT32)Difference(stream_next_id, stream_sync_id);
if (delta > (2*pending_mask.GetSize()))
GetFirstPending(stream_sync_id);
return true;
}
}
}
}
}
else
{
NormBlock* block;
while (NULL != (block = block_buffer.Find(block_buffer.RangeLo())))
{
block_buffer.Remove(block);
sender->PutFreeBlock(block);
}
pending_mask.Clear();
pending_mask.SetBits(blockId.GetValue(), pending_mask.GetSize());
stream_sync = true;
stream_sync_id = blockId;
stream_next_id = blockId;
Increment(stream_next_id, pending_mask.GetSize());
if ((NULL != sender) && read_init)
{
PLOG(PL_DEBUG, "NormStreamObject::StreamUpdateStatus() syncing stream to blockId: %lu\n",
(unsigned long)blockId.GetValue());
read_init = false;
read_index.block = blockId;
read_index.segment = 0;
read_index.offset = 0;
read_offset = 0;
sender->DecrementResyncCount(); if ((NormSenderNode::SYNC_CURRENT != sender->GetSyncPolicy()) &&
(0 != blockId.GetValue()))
{
stream_broken = true;
}
}
return true;
}
}
char* NormStreamObject::RetrieveSegment(NormBlockId blockId,
NormSegmentId segmentId)
{
NormBlock* block = stream_buffer.Find(blockId);
if (!block)
{
PLOG(PL_FATAL, "NormStreamObject::RetrieveSegment() segment block unavailable\n");
return NULL;
}
char* segment = block->GetSegment(segmentId);
if (NULL == segment)
PLOG(PL_FATAL, "NormStreamObject::RetrieveSegment() segment unavailable\n");
return segment;
}
UINT16 NormStreamObject::ReadSegment(NormBlockId blockId,
NormSegmentId segmentId,
char* buffer)
{
NormBlock* block = stream_buffer.Find(blockId);
if (NULL == block)
{
if (!stream_buffer.IsEmpty() && (Compare(blockId, stream_buffer.RangeLo()) < 0))
{
PLOG(PL_ERROR, "NormStreamObject::ReadSegment() error: attempted to read old block> %lu\n",
(unsigned long)blockId.GetValue());
}
return 0;
}
if ((blockId == write_index.block) && (segmentId >= write_index.segment))
{
return 0;
}
block->UnsetPending(segmentId);
char* segment = block->GetSegment(segmentId);
ASSERT(segment != NULL);
if (Compare(blockId, tx_index.block) > 0)
{
tx_index.block = blockId;
tx_index.segment = segmentId;
}
else if ((blockId == tx_index.block) && (segmentId > tx_index.segment))
{
tx_index.segment = segmentId;
}
if (!write_vacancy)
{
ASSERT(Compare(write_index.block, tx_index.block) >= 0);
UINT32 blockDelta = (UINT32)Difference(write_index.block, tx_index.block);
if (blockDelta <= (block_pool.GetTotal() >> 1))
{
NormBlock* b = stream_buffer.Find(stream_buffer.RangeLo());
if (NULL != b)
{
if (!b->IsPending())
{
double delay = session.GetFlowControlDelay() - b->GetNackAge();
if (delay < 1.0e-06)
{
if (session.FlowControlIsActive() && (session.GetFlowControlObject() == GetId()))
session.DeactivateFlowControl();
write_vacancy = true;
}
else
{
if (!session.FlowControlIsActive())
{
session.ActivateFlowControl(delay, GetId(), NormController::TX_QUEUE_VACANCY);
PLOG(PL_DEBUG, "NormStreamObject::ReadSegment() asserting flow control for stream (postedEmpty:%d)\n",
session.GetPostedTxQueueEmpty());
}
}
}
}
else
{
write_vacancy = true;
}
if (write_vacancy)
session.Notify(NormController::TX_QUEUE_VACANCY, NULL, this);
}
}
UINT16 segmentLength = NormDataMsg::ReadStreamPayloadLength(segment);
ASSERT(segmentLength <= segment_size);
UINT16 payloadLength = segmentLength+NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
payloadMax = MIN(payloadMax, SIM_PAYLOAD_MAX);
UINT16 copyMax = MIN(payloadMax, payloadLength);
memcpy(buffer, segment, copyMax);
#else
memcpy(buffer, segment, payloadLength);
#endif return payloadLength;
}
bool NormStreamObject::WriteSegment(NormBlockId blockId,
NormSegmentId segmentId,
const char* segment)
{
ASSERT(!read_init);
if ((Compare(blockId, read_index.block) < 0) ||
((blockId == read_index.block) && (segmentId < read_index.segment)))
{
PLOG(PL_DEBUG, "NormStreamObject::WriteSegment() block/segment < read_index!?\n");
return false;
}
NormBlock* block = stream_buffer.Find(blockId);
if (NULL == block)
{
bool broken = false;
bool dataLost = false;
while (block_pool.IsEmpty() || !stream_buffer.CanInsert(blockId))
{
block = stream_buffer.Find(stream_buffer.RangeLo());
ASSERT(NULL != block);
if (Compare(blockId, block->GetId()) < 0)
{
PLOG(PL_DEBUG, "NormStreamObject::WriteSegment() blockId too old!?\n");
return false;
}
while (block->IsPending())
{
NormSegmentId firstPending;
block->GetFirstPending(firstPending);
if (read_index.block != block->GetId())
{
read_index.block = block->GetId();
read_index.segment = firstPending;
read_index.offset = 0;
broken = true;
stream_broken = true;
}
if (read_index.segment < firstPending)
{
read_index.segment = firstPending;
read_index.offset = 0;
broken = true;
stream_broken = true;
}
NormBlock* tempBlock = block;
NormStreamObject::Index tempIndex = read_index;
if (notify_on_update)
{
notify_on_update = false;
session.Notify(NormController::RX_OBJECT_UPDATED, sender, this);
}
block = stream_buffer.Find(stream_buffer.RangeLo());
if (tempBlock == block)
{
if ( (tempIndex.block == read_index.block) &&
(tempIndex.segment == read_index.segment) &&
(tempIndex.offset == read_index.offset))
{
dataLost = true;
broken = true;
stream_broken = true;
block->UnsetPending(read_index.segment++);
read_index.offset = 0;
if (read_index.segment >= ndata)
{
Increment(read_index.block);
read_index.segment = 0;
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
block = NULL;
Prune(read_index.block, false);
break;
}
}
}
else
{
block = NULL;
break;
}
} if (NULL != block)
{
NormBlockId bid = block->GetId();
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
if (bid == read_index.block)
{
stream_broken = true;
broken = true;
Increment(read_index.block);
read_index.segment = 0;
read_index.offset = 0;
Prune(read_index.block, false);
}
}
} if (broken)
{
PLOG(PL_WARN, "NormStreamObject::WriteSegment() node>%lu obj>%hu blk>%lu seg>%hu broken stream ...\n",
(unsigned long)LocalNodeId(), (UINT16)transport_id,
(unsigned long)blockId.GetValue(), (UINT16)segmentId);
if (dataLost)
PLOG(PL_ERROR, "NormStreamObject::WriteSegment() broken stream data not read by app!\n");
}
block = block_pool.Get();
block->SetId(blockId);
block->ClearPending();
ASSERT(Compare(blockId, read_index.block) >= 0);
bool success = stream_buffer.Insert(block);
ASSERT(success);
}
if(!block->GetSegment(segmentId))
{
char* s = segment_pool.Get();
ASSERT(s != NULL); UINT16 payloadLength = NormDataMsg::ReadStreamPayloadLength(segment) + NormDataMsg::GetStreamPayloadHeaderLength();
#ifdef SIMULATE
UINT16 payloadMax = segment_size + NormDataMsg::GetStreamPayloadHeaderLength();
payloadMax = MIN(SIM_PAYLOAD_MAX, payloadMax);
payloadLength = MIN(payloadMax, payloadLength);
#endif memcpy(s, segment, payloadLength);
block->AttachSegment(segmentId, s);
block->SetPending(segmentId);
if (!read_ready)
{
if ((blockId == read_index.block) && (segmentId == read_index.segment))
{
read_ready = true;
}
else if (block_pool.GetCount() < block_pool_threshold)
{
read_ready = true;
}
else if (session.RcvrIsLowDelay())
{
INT32 delta = (UINT32)Difference(blockId, read_index.block);
if (delta > session.RcvrGetMaxDelay())
read_ready = true;
}
}
}
return true;
}
void NormStreamObject::Prune(NormBlockId blockId, bool updateStatus)
{
if (updateStatus || StreamUpdateStatus(blockId))
{
bool resync = false;
NormBlock* block;
while (NULL != (block = block_buffer.Find(block_buffer.RangeLo())))
{
if (Compare(block->GetId(), blockId) < 0)
{
resync = true;
pending_mask.Unset(block->GetId().GetValue());
block_buffer.Remove(block);
sender->PutFreeBlock(block);
}
else
{
break;
}
}
NormBlockId firstId;
if (GetFirstPending(firstId))
{
if (Compare(firstId, blockId) < 0)
{
resync = true;
UINT32 numBits = (UINT32)Difference(blockId, firstId);
pending_mask.UnsetBits(firstId.GetValue(), numBits);
}
}
while (!StreamUpdateStatus(blockId))
{
resync = true;
NormBlockId firstId;
if (GetFirstPending(firstId))
{
NormBlock* block = block_buffer.Find(firstId);
if (NULL != block)
{
block_buffer.Remove(block);
sender->PutFreeBlock(block);
}
pending_mask.Unset(firstId.GetValue());
}
else
{
StreamResync(blockId);
break;
}
}
if (resync)
{
if (notify_on_update)
{
notify_on_update = false;
session.Notify(NormController::RX_OBJECT_UPDATED, sender, this);
}
sender->IncrementResyncCount();
}
}
}
bool NormStreamObject::PassiveReadCheck(NormBlockId blockId, NormSegmentId segmentId)
{
bool result;
if (Compare(read_index.block, blockId) < 0)
result = true;
else if (read_index.block == blockId)
result = (read_index.segment <= segmentId);
else result = false;
return result;
}
bool NormStreamObject::Read(char* buffer, unsigned int* buflen, bool seekMsgStart)
{
if (stream_broken && !seekMsgStart)
{
if (NULL != buflen) *buflen = 0;
stream_broken = false;
return false;
}
unsigned int bytesWanted;
if (NULL == buflen)
{
bytesWanted = 0;
buflen = &bytesWanted;
}
else
{
bytesWanted = *buflen;
}
bool result = ReadPrivate(buffer, buflen, seekMsgStart);
if (!read_ready)
{
notify_on_update = true;
}
if (!seekMsgStart && result && (0 != *buflen) && (*buflen < bytesWanted))
{
char dummyBuffer[8];
unsigned int dummyCount = 8;
stream_broken = ReadPrivate(dummyBuffer, &dummyCount, false) ? false : true;
ASSERT(0 == dummyCount);
if (!read_ready)
{
notify_on_update = true;
}
}
return result;
}
bool NormStreamObject::ReadPrivate(char* buffer, unsigned int* buflen, bool seekMsgStart)
{
if (stream_closing || read_init)
{
if (stream_closing)
PLOG(PL_DEBUG, "NormStreamObject::Read() attempted to read from closed stream\n");
*buflen = 0;
return seekMsgStart ? false : true;
}
Retain();
unsigned int bytesRead = 0;
unsigned int bytesToRead = *buflen;
bool brokenStream = false;
do
{
NormBlock* block = stream_buffer.Find(read_index.block);
if (NULL == block)
{
PLOG(PL_DETAIL, "NormStreamObject::ReadPrivate() stream buffer empty (1) (sbEmpty:%d)\n", stream_buffer.IsEmpty());
read_ready = false;
*buflen = bytesRead;
if (bytesRead > 0)
{
Release();
return true;
}
else
{
bool forceForward = false;
if (block_pool.GetCount() < block_pool_threshold)
{
forceForward = true;
}
else if (session.RcvrIsLowDelay())
{
if (Compare(max_pending_block, read_index.block) >= 0)
{
INT32 delta = (INT32)Difference(max_pending_block, read_index.block);
if (delta > session.RcvrGetMaxDelay())
{
forceForward = true;
}
}
}
else
{
NormBlockId firstPending;
if (GetFirstPending(firstPending))
{
if (Compare(read_index.block, firstPending) < 0)
forceForward = true;
}
}
if (forceForward)
{
Increment(read_index.block);
read_index.segment = 0;
read_index.offset = 0;
if (!seekMsgStart) brokenStream = true;
Prune(read_index.block, false);
continue;
}
else
{
Release();
return (seekMsgStart || brokenStream) ? false : true;
}
}
}
char* segment = block->GetSegment(read_index.segment);
ASSERT((NULL == segment) || block->IsPending(read_index.segment));
if (NULL == segment)
{
PLOG(PL_DETAIL, "NormStreamObject::ReadPrivate(%lu:%hu) stream buffer empty (read_offset>%lu) (2)\n",
(unsigned long)read_index.block.GetValue(), read_index.segment, (unsigned long)read_offset);
read_ready = false;
*buflen = bytesRead;
if (bytesRead > 0)
{
Release();
return true;
}
else
{
bool forceForward = false;
if (block_pool.GetCount() < block_pool_threshold)
{
forceForward = true;
}
else if (session.RcvrIsLowDelay())
{
if (Compare(max_pending_block, read_index.block) >= 0)
{
INT32 delta = (UINT32)Difference(max_pending_block, read_index.block);
if (delta > session.RcvrGetMaxDelay())
forceForward = true;
}
}
else
{
NormBlockId firstPending;
if (GetFirstPending(firstPending))
{
if (Compare(read_index.block, firstPending) < 0)
forceForward = true;
}
}
if (forceForward)
{
if (++read_index.segment >= ndata)
{
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
Increment(read_index.block);
read_index.segment = 0;
read_index.offset = 0;
Prune(read_index.block, false); } if (!seekMsgStart) brokenStream = true;
continue; }
else
{
Release();
return (seekMsgStart || brokenStream) ? false : true;
}
}
} ASSERT(NULL != segment);
read_ready = true;
if (brokenStream)
{
Release();
return false;
}
UINT16 length = NormDataMsg::ReadStreamPayloadLength(segment);
if (0 == length)
{
switch (NormDataMsg::ReadStreamPayloadMsgStart(segment))
{
case 0:
break;
default:
PLOG(PL_ERROR, "NormStreamObject::ReadPrivate() invalid stream control message\n");
if (++read_index.segment >= ndata)
{
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
Increment(read_index.block);
read_index.segment = 0;
read_index.offset = 0;
}
continue;
break;
} }
else if (length > segment_size)
{
read_ready = false;
if (bytesRead > 0)
{
*buflen = bytesRead;
Release();
return true;
}
else
{
PLOG(PL_ERROR, "NormStreamObject::ReadPrivate() node>%lu obj>%hu blk>%lu seg>%hu invalid stream segment!\n",
(unsigned long)LocalNodeId(), (UINT16)transport_id,
(unsigned long)read_index.block.GetValue(), (UINT16)read_index.segment);
if (++read_index.segment >= ndata)
{
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
Increment(read_index.block);
read_index.segment = 0;
read_index.offset = 0;
Prune(read_index.block, false);
}
*buflen = 0;
Release();
return false;
}
}
UINT32 segmentOffset = NormDataMsg::ReadStreamPayloadOffset(segment);
if ((length > 0) && (read_index.offset >= length))
{
read_ready = false; if (bytesRead > 0)
{
*buflen = bytesRead;
Release();
return true;
}
else
{
PLOG(PL_ERROR, "NormStreamObject::ReadPrivate() node>%lu obj>%hu blk>%lu seg>%hu mangled stream! "
"offset:%hu length:%hu read_offset:%lu segmentOffset:%lu\n",
(unsigned long)read_index.block.GetValue(), (UINT16)read_index.segment,
read_index.offset, length, (unsigned long)read_offset, (unsigned long)segmentOffset);
read_offset = segmentOffset;
read_index.offset = 0;
*buflen = 0;
Release();
return false;
}
}
if (seekMsgStart)
{
UINT16 msgStart = NormDataMsg::ReadStreamPayloadMsgStart(segment);
if (0 == msgStart)
{
if (0 == NormDataMsg::ReadStreamPayloadLength(segment))
{
PLOG(PL_DEBUG, "NormStreamObject::ReadPrivate() stream ended by sender 1\n");
session.Notify(NormController::RX_OBJECT_COMPLETED, sender, this);
stream_closing = true;
sender->DeleteObject(this);
}
block->UnsetPending(read_index.segment++);
read_index.offset = 0;
if (read_index.segment >= ndata)
{
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
Increment(read_index.block);
read_index.segment = 0;
Prune(read_index.block, false);
}
continue;
}
else
{
read_offset += (msgStart - 1);
read_index.offset = (msgStart - 1);
seekMsgStart = false;
}
}
UINT16 count = length - read_index.offset;
count = MIN(count, bytesToRead);
#ifdef SIMULATE
UINT16 simCount = read_index.offset + count + NormDataMsg::GetStreamPayloadHeaderLength();
simCount = (simCount < SIM_PAYLOAD_MAX) ? (SIM_PAYLOAD_MAX - simCount) : 0;
memcpy(buffer+bytesRead, segment+read_index.offset+NormDataMsg::GetStreamPayloadHeaderLength(), simCount);
#else
memcpy(buffer+bytesRead, segment+read_index.offset+NormDataMsg::GetStreamPayloadHeaderLength(), count);
#endif
read_index.offset += count;
bytesRead += count;
read_offset += count;
bytesToRead -= count;
if (read_index.offset >= length)
{
bool streamEnded = (0 == NormDataMsg::ReadStreamPayloadLength(segment));
block->UnsetPending(read_index.segment++);
read_index.offset = 0;
if (read_index.segment >= ndata)
{
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
block_pool.Put(block);
Increment(read_index.block);
read_index.segment = 0;
Prune(read_index.block, false);
if (0 == bytesToRead)
read_ready = DetermineReadReadiness();
}
else
{
if (0 == bytesToRead)
read_ready = (NULL != block->GetSegment(read_index.segment));
}
if (streamEnded)
{
PLOG(PL_DEBUG, "NormStreamObject::ReadPrivate() stream ended by sender 2\n");
session.Notify(NormController::RX_OBJECT_COMPLETED, sender, this);
stream_closing = true;
sender->DeleteObject(this);
}
}
} while ((bytesToRead > 0) || seekMsgStart);
*buflen = bytesRead;
Release();
return true;
}
void NormStreamObject::Terminate()
{
Flush(); stream_closing = true;
NormBlock* block = stream_buffer.Find(write_index.block);
if (NULL == block)
{
if (NULL == (block = block_pool.Get()))
{
block = stream_buffer.Find(stream_buffer.RangeLo());
ASSERT(NULL != block);
if (block->IsPending())
{
NormBlockId blockId = block->GetId();
pending_mask.Unset(blockId.GetValue());
repair_mask.Unset(blockId.GetValue());
NormBlock* b = FindBlock(blockId);
if (b)
{
block_buffer.Remove(b);
session.SenderPutFreeBlock(b);
}
if (!pending_mask.IsSet())
{
pending_mask.Set(write_index.block.GetValue());
stream_next_id = write_index.block;
Increment(stream_next_id);
}
}
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
}
block->SetId(write_index.block);
block->ClearPending();
bool success = stream_buffer.Insert(block);
ASSERT(success);
} char* segment = block->GetSegment(write_index.segment);
if (NULL == segment)
{
if (NULL == (segment = segment_pool.Get()))
{
NormBlock* b = stream_buffer.Find(stream_buffer.RangeLo());
ASSERT(b != block);
if (b->IsPending())
{
NormBlockId blockId = b->GetId();
pending_mask.Unset(blockId.GetValue());
repair_mask.Unset(blockId.GetValue());
NormBlock* c = FindBlock(blockId);
if (c)
{
block_buffer.Remove(c);
session.SenderPutFreeBlock(c);
}
if (!pending_mask.IsSet())
{
pending_mask.Set(write_index.block.GetValue());
stream_next_id = write_index.block;
Increment(stream_next_id);
}
}
stream_buffer.Remove(b);
b->EmptyToPool(segment_pool);
block_pool.Put(b);
segment = segment_pool.Get();
ASSERT(NULL != segment);
}
block->AttachSegment(write_index.segment, segment);
NormDataMsg::WriteStreamPayloadMsgStart(segment, 0);
NormDataMsg::WriteStreamPayloadLength(segment, 0);
}
else
{
}
NormDataMsg::WriteStreamPayloadOffset(segment, write_offset);
block->SetPending(write_index.segment);
if (++write_index.segment >= ndata)
{
Increment(write_index.block);
write_index.segment = 0;
}
flush_pending = true;
session.TouchSender();
}
unsigned int NormStreamObject::GetVacancy(unsigned int wanted)
{
ASSERT(Compare(write_index.block, tx_index.block) >= 0);
unsigned int maxDelta = block_pool.GetTotal() >> 1;
UINT32 blockDelta = (UINT32)Difference(write_index.block, tx_index.block);
if (blockDelta > maxDelta) return 0;
UINT32 nBytes = 0;
NormBlock* block = stream_buffer.Find(write_index.block);
if (NULL != block)
{
char* segment = block->GetSegment(write_index.segment);
if (NULL != segment)
{
UINT16 index = NormDataMsg::ReadStreamPayloadLength(segment);
nBytes += segment_size - index;
}
else
{
nBytes += segment_size;
}
nBytes += (ndata - write_index.segment - 1) * segment_size;
}
unsigned int blocksAllowed = maxDelta - blockDelta;
unsigned int poolCount = block_pool.GetCount();
if (poolCount >= blocksAllowed)
poolCount = blocksAllowed;
nBytes += poolCount * ndata * segment_size;
NormBlockBuffer::Iterator iterator(block_buffer);
while ((NULL != (block = iterator.GetNextBlock())) &&
(blocksAllowed > 0) &&
((0 == wanted) || (nBytes < wanted)))
{
double delay = session.GetFlowControlDelay() - block->GetNackAge();
if (block->IsPending() || (delay >= 1.0e-06)) break;
nBytes += (segment_size * ndata);
blocksAllowed--;
}
return nBytes;
}
UINT32 NormStreamObject::Write(const char* buffer, UINT32 len, bool eom)
{
UINT32 nBytes = 0;
do
{
if (stream_closing)
{
if (0 != len)
{
PLOG(PL_ERROR, "NormStreamObject::Write() error: stream is closing (len:%lu eom:%d)\n",
(unsigned long)len, eom);
len = 0;
}
break;
}
ASSERT(Compare(write_index.block, tx_index.block) >= 0);
UINT32 deltaBlock = (UINT32)Difference(write_index.block, tx_index.block);
if (deltaBlock > (block_pool.GetTotal() >> 1))
{
write_vacancy = false;
PLOG(PL_DEBUG, "NormStreamObject::Write() stream buffer full (1)\n");
if (!push_mode) break;
}
NormBlock* block = stream_buffer.Find(write_index.block);
if (NULL == block)
{
block = block_pool.Get();
if (NULL == block)
{
block = stream_buffer.Find(stream_buffer.RangeLo());
ASSERT(NULL != block);
double delay = session.GetFlowControlDelay() - block->GetNackAge();
if (block->IsPending() || (delay >= 1.0e-06))
{
write_vacancy = false;
if (push_mode)
{
NormBlockId blockId = block->GetId();
pending_mask.Unset(blockId.GetValue());
repair_mask.Unset(blockId.GetValue());
NormBlock* b = FindBlock(blockId);
if (b)
{
block_buffer.Remove(b);
session.SenderPutFreeBlock(b);
}
if (!pending_mask.IsSet())
{
pending_mask.Set(write_index.block.GetValue());
stream_next_id = write_index.block;
Increment(stream_next_id);
}
}
else
{
if (!block->IsPending())
{
PLOG(PL_DEBUG, "NormStreamObject::Write() asserting flow control for stream (postedEmpty:%d)\n",
session.GetPostedTxQueueEmpty());
if (session.GetPostedTxQueueEmpty())
session.ActivateFlowControl(delay, GetId(), NormController::TX_QUEUE_EMPTY);
else
session.ActivateFlowControl(delay, GetId(), NormController::TX_QUEUE_VACANCY);
}
PLOG(PL_DEBUG, "NormStreamObject::Write() stream buffer full (2) len:%d eom:%d\n", len, eom);
break;
}
}
stream_buffer.Remove(block);
block->EmptyToPool(segment_pool);
}
block->SetId(write_index.block);
block->ClearPending();
bool success = stream_buffer.Insert(block);
ASSERT(success);
} char* segment = block->GetSegment(write_index.segment);
if (NULL == segment)
{
if (NULL == (segment = segment_pool.Get()))
{
NormBlock* b = stream_buffer.Find(stream_buffer.RangeLo());
ASSERT(b != block);
if (b->IsPending())
{
write_vacancy = false;
if (push_mode)
{
NormBlockId blockId = b->GetId();
pending_mask.Unset(blockId.GetValue());
repair_mask.Unset(blockId.GetValue());
NormBlock* c = FindBlock(blockId);
if (c)
{
block_buffer.Remove(c);
session.SenderPutFreeBlock(c);
}
if (!pending_mask.IsSet())
{
pending_mask.Set(write_index.block.GetValue());
stream_next_id = write_index.block;
Increment(stream_next_id);
}
}
else
{
PLOG(PL_DEBUG, "NormStreamObject::Write() stream buffer full (3)\n");
break;
}
}
stream_buffer.Remove(b);
b->EmptyToPool(segment_pool);
block_pool.Put(b);
segment = segment_pool.Get();
ASSERT(NULL != segment);
}
NormDataMsg::WriteStreamPayloadMsgStart(segment, 0);
NormDataMsg::WriteStreamPayloadLength(segment, 0);
NormDataMsg::WriteStreamPayloadOffset(segment, write_offset);
block->AttachSegment(write_index.segment, segment);
}
UINT16 index = NormDataMsg::ReadStreamPayloadLength(segment);
if (msg_start && (0 != len))
{
if (0 == NormDataMsg::ReadStreamPayloadMsgStart(segment))
NormDataMsg::WriteStreamPayloadMsgStart(segment, index+1);
msg_start = false;
}
UINT32 count = len - nBytes;
UINT32 space = (UINT32)(segment_size - index);
count = MIN(count, space);
#ifdef SIMULATE
UINT32 simCount = index + NormDataMsg::GetStreamPayloadHeaderLength();
simCount = (simCount < SIM_PAYLOAD_MAX) ? (SIM_PAYLOAD_MAX - simCount) : 0;
simCount = MIN(count, simCount);
memcpy(segment+index+NormDataMsg::GetStreamPayloadHeaderLength(), buffer+nBytes, simCount);
#else
memcpy(segment+index+NormDataMsg::GetStreamPayloadHeaderLength(), buffer+nBytes, count);
#endif NormDataMsg::WriteStreamPayloadLength(segment, index+count);
nBytes += count;
write_offset += count;
if ((count == space) ||
((FLUSH_NONE != flush_mode) && (nBytes == len) && ((0 != index) || (0 != len))))
{
block->SetPending(write_index.segment);
if (++write_index.segment >= ndata)
{
ProtoTime currentTime;
currentTime.GetCurrentTime();
block->SetLastNackTime(currentTime);
Increment(write_index.block);
write_index.segment = 0;
}
}
} while (nBytes < len);
if (nBytes == len)
{
if (eom)
msg_start = true;
if (FLUSH_ACTIVE == flush_mode)
flush_pending = true;
else if (!stream_closing)
flush_pending = false;
if ((0 != nBytes) || (FLUSH_NONE != flush_mode))
{
session.TouchSender();
}
}
else
{
session.TouchSender();
}
return nBytes;
}
#ifdef SIMULATE
NormSimObject::NormSimObject(class NormSession& theSession,
class NormSenderNode* theSender,
const NormObjectId& objectId)
: NormObject(FILE, theSession, theSender, objectId)
{
}
NormSimObject::~NormSimObject()
{
}
bool NormSimObject::Open(UINT32 objectSize,
const char* infoPtr ,
UINT16 infoLen)
{
return (sender ?
true :
NormObject::Open(NormObjectSize(objectSize),
infoPtr, infoLen,
session.SenderSegmentSize(),
session.GetSenderFecId(),
session.GetSenderFecFieldSize(),
session.SenderBlockSize(),
session.SenderNumParity()));
}
UINT16 NormSimObject::ReadSegment(NormBlockId blockId,
NormSegmentId segmentId,
char* buffer)
{
UINT16 len;
if (blockId == final_block_id)
{
if (segmentId == (GetBlockSize(blockId)-1))
len = final_segment_size;
else
len = segment_size;
}
else
{
len = segment_size;
}
return len;
}
char* NormSimObject::RetrieveSegment(NormBlockId blockId,
NormSegmentId segmentId)
{
return sender ? sender->GetRetrievalSegment() : NULL;
}
#endif
NormObjectTable::NormObjectTable()
#ifndef USE_PROTO_TREE
: table((NormObject**)NULL),
#else
:
#endif range_max(0), range(0),
count(0), size(0)
{
}
NormObjectTable::~NormObjectTable()
{
Destroy();
}
bool NormObjectTable::Init(UINT16 rangeMax, UINT16 tableSize)
{
Destroy();
if (!rangeMax || !tableSize) return false;
#ifndef USE_PROTO_TREE
if (!rangeMax || !tableSize) return false;
if (0 != (tableSize & 0x07)) tableSize = (tableSize >> 3) + 1;
if (!(table = new NormObject*[tableSize]))
{
PLOG(PL_FATAL, "NormObjectTable::Init() table allocation error: %s\n", GetErrorString());
return false;
}
memset(table, 0, tableSize*sizeof(char*));
hash_mask = tableSize - 1;
#endif range_max = rangeMax;
count = range = 0;
size = NormObjectSize(0);
return true;
}
void NormObjectTable::SetRangeMax(UINT16 rangeMax)
{
if (rangeMax < range_max)
{
while (range > rangeMax)
{
NormObject* obj = Find(range_lo);
ASSERT(NULL != obj);
NormSenderNode* sender = obj->GetSender();
NormSession& session = obj->GetSession();
if (NULL == sender)
{
session.DeleteTxObject(obj, true);
}
else
{
if (!session.ReceiverIsSilent()) obj = Find(range_hi);
session.Notify(NormController::RX_OBJECT_ABORTED, sender, obj);
sender->DeleteObject(obj);
}
}
}
range_max = rangeMax;
}
#ifdef USE_PROTO_TREE
NormObject* NormObjectTable::Find(const NormObjectId& objectId) const
{
if ((0 == range) || (objectId < range_lo) || (objectId > range_hi))
return NULL;
else
return tree.Find(objectId.GetValuePtr(), 8*sizeof(UINT16));
}
void NormObjectTable::Destroy()
{
NormObject* obj;
while(NULL != (obj = Find(range_lo)))
{
Remove(obj);
obj->Release();
}
count = range = range_max = 0;
}
#else
NormObject* NormObjectTable::Find(const NormObjectId& objectId) const
{
if (0 != range)
{
if ((objectId < range_lo) || (objectId > range_hi)) return (NormObject*)NULL;
NormObject* theObject = table[((UINT16)objectId) & hash_mask];
while (theObject && (objectId != theObject->GetId()))
theObject = theObject->next;
return theObject;
}
else
{
return (NormObject*)NULL;
}
}
void NormObjectTable::Destroy()
{
if (NULL != table)
{
NormObject* obj;
while((obj = Find(range_lo)))
{
Remove(obj);
obj->Release();
}
delete[] table;
table = (NormObject**)NULL;
count = range = range_max = 0;
}
}
#endif
bool NormObjectTable::CanInsert(NormObjectId objectId) const
{
if (0 != range)
{
if (objectId < range_lo)
{
if ((range_lo - objectId + range) > range_max)
return false;
else
return true;
}
else if (objectId > range_hi)
{
if ((objectId - range_hi + range) > range_max)
return false;
else
return true;
}
else
{
return true;
}
}
else
{
return true;
}
}
bool NormObjectTable::Insert(NormObject* theObject)
{
const NormObjectId& objectId = theObject->GetId();
if (0 == range)
{
range_lo = range_hi = objectId;
range = 1;
}
else if (objectId < range_lo)
{
UINT16 newRange = range_lo - objectId + range;
if (newRange > range_max) return false;
range_lo = objectId;
ASSERT(range_lo <= range_hi);
range = newRange;
}
else if (objectId > range_hi)
{
UINT16 newRange = objectId - range_hi + range;
if (newRange > range_max) return false;
range_hi = objectId;
ASSERT(range_lo <= range_hi);
range = newRange;
}
#ifdef USE_PROTO_TREE
ASSERT(NULL == Find(theObject->GetId()));
tree.Insert(*theObject);
#else
UINT16 index = ((UINT16)objectId) & hash_mask;
NormObject* prev = NULL;
NormObject* entry = table[index];
while (entry && (entry->GetId() < objectId))
{
prev = entry;
entry = entry->next;
}
if (prev)
prev->next = theObject;
else
table[index] = theObject;
ASSERT(((NULL != entry) ? (objectId != entry->GetId()) : true));
theObject->next = entry;
#endif count++;
size = size + theObject->GetSize();
theObject->Retain();
return true;
}
#ifdef USE_PROTO_TREE
bool NormObjectTable::Remove(NormObject* theObject)
{
ASSERT(NULL != theObject);
const NormObjectId& objectId = theObject->GetId();
if (range)
{
if ((objectId < range_lo) || (objectId > range_hi)) return false;
if (range > 1)
{
if (objectId == range_lo)
{
const NormObject* next = static_cast<const NormObject*>(theObject->GetNext());
if (NULL == next) next = static_cast<const NormObject*>(tree.GetHead());
ASSERT(NULL != next);
range_lo = next->GetId();
ASSERT(range_lo <= range_hi);
range = range_hi - range_lo + 1;
}
else if (objectId == range_hi)
{
const NormObject* prev = static_cast<const NormObject*>(theObject->GetPrev());
if (NULL == prev) prev = static_cast<const NormObject*>(tree.GetTail());
ASSERT(NULL != prev);
range_hi = prev->GetId();
ASSERT(range_lo <= range_hi);
range = range_hi - range_lo + 1;
}
}
else
{
range = 0;
}
ASSERT(NULL != tree.Find(theObject->GetId().GetValuePtr(), 8*sizeof(UINT16)));
tree.Remove(*theObject);
count--;
size = size - theObject->GetSize();
theObject->Release();
return true;
}
else
{
return false;
}
} #else
bool NormObjectTable::Remove(NormObject* theObject)
{
ASSERT(NULL != theObject);
const NormObjectId& objectId = theObject->GetId();
if (range)
{
if ((objectId < range_lo) || (objectId > range_hi)) return false;
UINT16 index = ((UINT16)objectId) & hash_mask;
NormObject* prev = NULL;
NormObject* entry = table[index];
while (entry && (entry->GetId() != objectId))
{
prev = entry;
entry = entry->next;
}
if (entry != theObject) return false;
if (prev)
prev->next = entry->next;
else
table[index] = entry->next;
if (range > 1)
{
if (objectId == range_lo)
{
UINT16 i = index;
UINT16 endex;
if (range <= hash_mask)
endex = (index + range - 1) & hash_mask;
else
endex = index;
entry = NULL;
UINT16 offset = 0;
NormObjectId nextId = range_hi;
do
{
++i &= hash_mask;
offset++;
if ((entry = table[i]))
{
NormObjectId id = (UINT16)objectId + offset;
while(entry && (entry->GetId() != id))
{
if ((entry->GetId() > objectId) &&
(entry->GetId() < nextId)) nextId = entry->GetId();
entry = entry->next;
}
if (entry) break;
}
} while (i != endex);
if (entry)
range_lo = entry->GetId();
else
range_lo = nextId;
range = range_hi - range_lo + 1;
}
else if (objectId == range_hi)
{
UINT16 i = index;
UINT16 endex;
if (range <= hash_mask)
endex = (index - range + 1) & hash_mask;
else
endex = index;
entry = NULL;
UINT16 offset = 0;
NormObjectId prevId = range_lo;
do
{
--i &= hash_mask;
offset++;
if ((entry = table[i]))
{
NormObjectId id = (UINT16)objectId - offset;
while(entry && (entry->GetId() != id))
{
if ((entry->GetId() < objectId) &&
(entry->GetId() > prevId)) prevId = entry->GetId();
entry = entry->next;
}
if (entry) break;
}
} while (i != endex);
if (entry)
range_hi = entry->GetId();
else
range_hi = prevId;
range = range_hi - range_lo + 1;
}
}
else
{
range = 0;
}
count--;
size = size - theObject->GetSize();
theObject->Release();
return true;
}
else
{
return false;
}
} #endif
#ifdef USE_PROTO_TREE
NormObjectTable::Iterator::Iterator(NormObjectTable& objectTable)
: table(objectTable), iterator(objectTable.tree, false, objectTable.range_lo.GetValuePtr(), 8*sizeof(UINT16))
{
next_object = iterator.GetNextItem();
ASSERT((NULL == next_object) || (objectTable.range_lo == next_object->GetId()))
}
void NormObjectTable::Iterator::Reset()
{
iterator.Reset(false, table.range_lo.GetValuePtr(), 8*sizeof(UINT16));
next_object = iterator.GetNextItem();
ASSERT(table.IsEmpty() || (NULL != next_object));
ASSERT((NULL == next_object) || (table.range_lo == next_object->GetId()))
}
NormObject* NormObjectTable::Iterator::GetNextObject()
{
NormObject* nextObj = next_object;
if (NULL != nextObj)
{
next_object = iterator.GetNextItem();
if (NULL == next_object)
{
iterator.Reset();
next_object = iterator.GetNextItem();
if (next_object->GetId() <= nextObj->GetId())
next_object = NULL;
}
else if (next_object->GetId() <= nextObj->GetId())
{
next_object = NULL;
}
}
return nextObj;
}
NormObject* NormObjectTable::Iterator::GetPrevObject()
{
if (!iterator.IsReversed())
{
iterator.Reset(true, table.range_hi.GetValuePtr(), 8*sizeof(UINT16));
next_object = iterator.GetPrevItem();
}
NormObject* prevObj = next_object;
if (NULL != prevObj)
{
next_object = iterator.GetPrevItem();
if (NULL == next_object)
{
iterator.Reset(true);
next_object = iterator.GetPrevItem();
if (next_object->GetId() >= prevObj->GetId())
next_object = NULL;
}
else if (next_object->GetId() >= prevObj->GetId())
{
next_object = NULL;
}
}
return prevObj;
}
#else
NormObjectTable::Iterator::Iterator(const NormObjectTable& objectTable)
: table(objectTable), reset(true)
{
}
NormObject* NormObjectTable::Iterator::GetNextObject()
{
if (reset)
{
if (table.range)
{
reset = false;
index = table.range_lo;
return table.Find(index);
}
else
{
return (NormObject*)NULL;
}
}
else
{
if (table.range &&
(index < table.range_hi) &&
(index >= table.range_lo))
{
UINT16 i = index;
UINT16 endex;
if ((UINT16)(table.range_hi - index) <= table.hash_mask)
endex = table.range_hi & table.hash_mask;
else
endex = index;
UINT16 offset = 0;
NormObjectId nextId = table.range_hi;
do
{
++i &= table.hash_mask;
offset++;
NormObjectId id = (UINT16)index + offset;
NormObject* entry = table.table[i];
while ((NULL != entry) && (entry->GetId() != id))
{
if ((entry->GetId() > index) && (entry->GetId() < nextId))
nextId = entry->GetId();
entry = table.Next(entry);
}
if (entry)
{
index = entry->GetId();
return entry;
}
} while (i != endex);
index = nextId;
return table.Find(nextId);
}
else
{
return (NormObject*)NULL;
}
}
}
NormObject* NormObjectTable::Iterator::GetPrevObject()
{
if (reset)
{
if (table.range)
{
reset = false;
index = table.range_hi;
return table.Find(index);
}
else
{
return (NormObject*)NULL;
}
}
else
{
if (table.range &&
(index <= table.range_hi) &&
(index > table.range_lo))
{
UINT16 i = index;
UINT16 endex;
if ((UINT16)(index - table.range_lo) <= table.hash_mask)
endex = table.range_lo & table.hash_mask;
else
endex = index;
UINT16 offset = 0;
NormObjectId nextId = table.range_hi;
do
{
--i &= table.hash_mask;
offset--;
NormObjectId id = (UINT16)index + offset;
NormObject* entry = table.table[i];
while ((NULL != entry ) && (entry->GetId() != id))
{
if ((entry->GetId() > index) && (entry->GetId() < nextId))
nextId = entry->GetId();
entry = table.Next(entry);
}
if (entry)
{
index = entry->GetId();
return entry;
}
} while (i != endex);
index = nextId;
return table.Find(nextId);
}
else
{
return (NormObject*)NULL;
}
}
}
#endif