#include "processor/operator/cross_product.h"
#include "common/metric.h"
namespace lbug {
namespace processor {
void CrossProduct::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* ) {
for (auto& pos : info.outVecPos) {
vectorsToScan.push_back(resultSet->getValueVector(pos).get());
}
localState.init();
}
bool CrossProduct::getNextTuplesInternal(ExecutionContext* context) {
auto table = localState.table.get();
if (table->getNumTuples() == 0) {
return false;
}
if (localState.startIdx == table->getNumTuples()) { if (!children[0]->getNextTuple(context)) { return false;
}
localState.startIdx = 0; }
auto numTuplesToScan =
std::min(localState.maxMorselSize, table->getNumTuples() - localState.startIdx);
table->scan(vectorsToScan, localState.startIdx, numTuplesToScan, info.colIndicesToScan);
localState.startIdx += numTuplesToScan;
metrics->numOutputTuple.increase(numTuplesToScan);
return true;
}
} }