1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#pragma once
#include <memory>
#include <optional>
#include "main/query_result.h"
#include "materialized_query_result.h"
namespace lbug {
namespace main {
class ArrowQueryResult : public QueryResult {
static constexpr QueryResultType type_ = QueryResultType::ARROW;
public:
struct CSRMetadata {
// Dense global indptr. Populated ONLY on the merged result produced
// by combineCSRChunks()/kwayMergeCSRChunks(); per-batch chunks leave
// this empty and use the sparse (srcRows, counts) representation
// below instead, so a batch only pays for the distinct source rows
// it actually touched (not numSourceRows+1 entries).
std::vector<int64_t> indptr;
std::vector<int64_t> indices;
std::vector<int64_t> edgeIDs;
// Sparse per-batch CSR representation: srcRows[i] is a global source
// row id touched by this batch (strictly increasing, since the rel
// scan emits edges in non-decreasing source order per thread and
// morsel acquisition is monotonic), and counts[i] is the number of
// edges for that source row. The edges for srcRows[i] live at
// indices[offset..offset+counts[i]-1] where offset = sum(counts[0..i-1]).
// Per-batch srcRows sets are disjoint (a source node is scanned in
// exactly one morsel -> one thread), so the merge is a union of
// sorted disjoint runs, not a general k-way merge.
std::vector<int64_t> srcRows;
std::vector<int64_t> counts;
bool hasEdgeIDs = false;
// Total number of source (node) rows in the table. Used to pad
// trailing empty rows in indptr; set at plan-mapping time from the
// node table cardinality.
int64_t numSourceRows = 0;
};
struct CSRArrowArray {
ArrowArray array{};
ArrowSchema schema{};
CSRArrowArray() = default;
~CSRArrowArray() { release(); }
CSRArrowArray(CSRArrowArray&& other) noexcept : array{other.array}, schema{other.schema} {
other.array.release = nullptr;
other.schema.release = nullptr;
}
CSRArrowArray& operator=(CSRArrowArray&& other) noexcept {
if (this != &other) {
release();
array = other.array;
schema = other.schema;
other.array.release = nullptr;
other.schema.release = nullptr;
}
return *this;
}
CSRArrowArray(const CSRArrowArray&) = delete;
CSRArrowArray& operator=(const CSRArrowArray&) = delete;
void release() {
if (schema.release) {
schema.release(&schema);
}
if (array.release) {
array.release(&array);
}
}
};
struct CSRArrowArrays {
CSRArrowArray indptr;
CSRArrowArray indices;
std::optional<CSRArrowArray> edgeIDs;
};
// View of the merged Arrow arrays as a chunked sequence, similar to
// Arrow C++ lib's arrow::ChunkedArray. We expose per-batch ArrowArrays
// in batch_index order instead of concatenating them via arrow::Concat
// (which we don't link); python users can construct pyarrow.ChunkedArray
// from these chunks and call combine_chunks() to materialize on the
// consumer side. Chunk order is the natural global row order for the
// query.
//
// Non-owning view: ArrowQueryResult retains ownership of the underlying
// chunk buffers. The caller is responsible for calling release on each
// chunk's ArrowArray when done, to free the buffers (matches the
// existing hasNextArrowChunk / getNextArrowChunk contract).
struct ArrowChunkedArray {
std::shared_ptr<const std::vector<ArrowArray>> chunks;
ArrowChunkedArray() = default;
explicit ArrowChunkedArray(std::shared_ptr<const std::vector<ArrowArray>> chunks)
: chunks{std::move(chunks)} {}
int64_t numChunks() const { return chunks ? static_cast<int64_t>(chunks->size()) : 0; }
int64_t length() const {
if (!chunks) {
return 0;
}
int64_t total = 0;
for (const auto& c : *chunks) {
total += c.length;
}
return total;
}
const ArrowArray* data() const {
return chunks && !chunks->empty() ? chunks->data() : nullptr;
}
bool empty() const { return !chunks || chunks->empty(); }
};
ArrowQueryResult(std::vector<ArrowArray> arrays, int64_t chunkSize);
ArrowQueryResult(std::vector<ArrowArray> arrays, int64_t chunkSize,
std::vector<CSRMetadata> csrChunks);
ArrowQueryResult(std::vector<std::string> columnNames,
std::vector<common::LogicalType> columnTypes, processor::FactorizedTable& table,
int64_t chunkSize);
uint64_t getNumTuples() const override;
bool hasNext() const override;
std::shared_ptr<processor::FlatTuple> getNext() override;
void resetIterator() override;
std::string toString() const override;
bool hasNextArrowChunk() override;
std::unique_ptr<ArrowArray> getNextArrowChunk(int64_t chunkSize) override;
// Get all ArrowArrays as a chunked view (ArrowChunkedArray, similar
// to Arrow C++ lib's arrow::ChunkedArray). Coexists with the
// hasNextArrowChunk / getNextArrowChunk iteration API; both refer to
// the same underlying chunks.
ArrowChunkedArray getArrowChunks() const;
// CSR metadata is stored as a chunked view: per-batch chunks in
// batch_index order (the same order the physical collector produces).
// The actual k-way merge across batches runs lazily on the first call
// to combineCSRChunks() / getCSRMetadata() / getCSRArrowArrays(); the
// merged result is cached. NO_ORDER / INSERTION_ORDER queries take
// zero work at result-construction time. combineCSRChunks() std::moves
// and consumes the per-batch chunks (freeing them incrementally as they
// are copied into the merged arrays), so csrChunks is empty afterwards;
// hasCSRMetadata() remains true via the cached merged result.
bool hasCSRMetadata() const { return !csrChunks.empty() || combinedCSR != nullptr; }
const CSRMetadata& getCSRMetadata() const { return combineCSRChunks(); }
CSRArrowArrays getCSRArrowArrays() const;
// Per-batch CSR chunks (in batch_index order), before merging. No merge
// is performed until combineCSRChunks() is called; this is the
// ChunkedArray-style view for callers that want to combine on the
// consumer side. NOTE: combineCSRChunks() consumes (moves) these chunks
// to free their memory as the merged arrays are built, so this returns
// an empty vector after the first merge call.
const std::vector<CSRMetadata>& getCSRChunks() const { return csrChunks; }
// K-way merge the per-batch CSR chunks in batch_index order into a
// single CSRMetadata. Result is cached; subsequent calls are O(1).
const CSRMetadata& combineCSRChunks() const;
private:
ArrowArray getArray(processor::FactorizedTableIterator& iterator, int64_t chunkSize);
private:
// Shared with ArrowChunkedArray views so the chunked view and the
// hasNextArrowChunk / getNextArrowChunk iteration API can coexist
// without invalidating each other.
std::shared_ptr<std::vector<ArrowArray>> arraysStorage;
int64_t chunkSize_;
uint64_t numTuples = 0;
uint64_t cursor = 0;
// Mutable because combineCSRChunks() (a const accessor, called from the
// const getCSRMetadata()/getCSRArrowArrays()) std::moves these chunks
// into the merge to free their memory as the merged arrays are built.
mutable std::vector<CSRMetadata> csrChunks;
mutable std::shared_ptr<const CSRMetadata> combinedCSR;
};
} // namespace main
} // namespace lbug