#include "processor/operator/limit.h"
#include "common/metric.h"
namespace lbug {
namespace processor {
std::string LimitPrintInfo::toString() const {
return "Limit: " + std::to_string(limitNum);
}
bool Limit::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
}
auto numTupleAvailable = resultSet->getNumTuples(dataChunksPosInScope);
auto numTupleProcessedBefore = counter->fetch_add(numTupleAvailable);
if (numTupleProcessedBefore + numTupleAvailable > limitNumber) {
int64_t numTupleToProcessInCurrentResultSet = limitNumber - numTupleProcessedBefore;
if (numTupleToProcessInCurrentResultSet <= 0) {
return false;
} else {
auto& dataChunkToSelect = resultSet->dataChunks[dataChunkToSelectPos];
DASSERT(!dataChunkToSelect->state->isFlat());
dataChunkToSelect->state->getSelVectorUnsafe().setSelSize(
numTupleToProcessInCurrentResultSet);
metrics->numOutputTuple.increase(numTupleToProcessInCurrentResultSet);
}
} else {
metrics->numOutputTuple.increase(numTupleAvailable);
}
return true;
}
} }