1use super::*;
2use crate::gather::{
3 RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
4 gather_rows_from_chunks as shared_gather_rows_from_chunks,
5 gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
6 gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
7 gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
8 gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
9 gather_rows_single_shot as shared_gather_rows_single_shot,
10 gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
11 gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
12 gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
13 gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
14};
15use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
16use crate::types::{LogicalFieldId, RowId};
17use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
18use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
19use arrow::record_batch::RecordBatch;
20use llkv_result::{Error, Result};
21use llkv_storage::{
22 pager::{BatchGet, GetResult, Pager},
23 serialization::deserialize_array,
24 types::PhysicalKey,
25};
26use rustc_hash::{FxHashMap, FxHashSet};
27use simd_r_drive_entry_handle::EntryHandle;
28use std::borrow::Cow;
29use std::sync::Arc;
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum GatherNullPolicy {
33 ErrorOnMissing,
35 IncludeNulls,
37 DropNulls,
39}
40
41#[derive(Clone, Debug, Default, Eq, PartialEq)]
42pub struct Projection {
43 pub logical_field_id: LogicalFieldId,
44 pub alias: Option<String>,
45}
46
47impl Projection {
48 pub fn new(logical_field_id: LogicalFieldId) -> Self {
49 Self {
50 logical_field_id,
51 alias: None,
52 }
53 }
54
55 pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
56 Self {
57 logical_field_id,
58 alias: Some(alias.into()),
59 }
60 }
61}
62
63impl From<LogicalFieldId> for Projection {
64 fn from(logical_field_id: LogicalFieldId) -> Self {
65 Projection::new(logical_field_id)
66 }
67}
68
69impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
70 fn from(value: (LogicalFieldId, S)) -> Self {
71 Projection::with_alias(value.0, value.1)
72 }
73}
74
75impl GatherNullPolicy {
76 #[inline]
77 fn allow_missing(self) -> bool {
78 !matches!(self, Self::ErrorOnMissing)
79 }
80}
81
82pub struct MultiGatherContext {
83 field_infos: Vec<(LogicalFieldId, DataType)>,
84 plans: Vec<FieldPlan>,
85 chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
86 row_index: FxHashMap<u64, usize>,
87 row_scratch: Vec<Option<(usize, usize)>>,
88 chunk_keys: Vec<PhysicalKey>,
89}
90
91impl MultiGatherContext {
92 fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
93 Self {
94 chunk_cache: FxHashMap::default(),
95 row_index: FxHashMap::default(),
96 row_scratch: Vec::new(),
97 chunk_keys: Vec::new(),
98 field_infos,
99 plans,
100 }
101 }
102
103 #[inline]
104 pub fn is_empty(&self) -> bool {
105 self.plans.is_empty()
106 }
107
108 #[inline]
109 fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
110 &self.field_infos
111 }
112
113 #[inline]
114 fn plans(&self) -> &[FieldPlan] {
115 &self.plans
116 }
117
118 #[inline]
119 fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
120 &self.chunk_cache
121 }
122
123 #[inline]
124 fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
125 &mut self.chunk_cache
126 }
127
128 #[inline]
129 fn plans_mut(&mut self) -> &mut [FieldPlan] {
130 &mut self.plans
131 }
132
133 fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
134 std::mem::take(&mut self.chunk_keys)
135 }
136
137 fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
138 self.chunk_keys = keys;
139 }
140
141 fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
142 std::mem::take(&mut self.row_index)
143 }
144
145 fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
146 self.row_index = row_index;
147 }
148
149 fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
150 std::mem::take(&mut self.row_scratch)
151 }
152
153 fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
154 self.row_scratch = scratch;
155 }
156
157 pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
158 let first_plan = self.plans.first()?;
159 let mut chunk_idx = None;
160 for (idx, meta) in first_plan.row_metas.iter().enumerate() {
161 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
162 chunk_idx = Some(idx);
163 break;
164 }
165 }
166 if chunk_idx.is_none() {
167 let total_chunks = first_plan.row_metas.len();
168 'outer: for idx in 0..total_chunks {
169 for plan in &self.plans {
170 let meta = &plan.row_metas[idx];
171 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
172 chunk_idx = Some(idx);
173 break 'outer;
174 }
175 }
176 }
177 }
178 let idx = chunk_idx?;
179
180 let mut span_min = u64::MAX;
181 let mut span_max = 0u64;
182 for plan in &self.plans {
183 let meta = plan.row_metas.get(idx)?;
184 span_min = span_min.min(meta.min_val_u64);
185 span_max = span_max.max(meta.max_val_u64);
186 }
187
188 if span_min > span_max {
189 return None;
190 }
191
192 Some((idx, span_min, span_max))
193 }
194}
195
196#[derive(Clone, Debug)]
197struct FieldPlan {
198 dtype: DataType,
199 value_metas: Vec<ChunkMetadata>,
200 row_metas: Vec<ChunkMetadata>,
201 candidate_indices: Vec<usize>,
202}
203
204impl<P> ColumnStore<P>
205where
206 P: Pager<Blob = EntryHandle> + Send + Sync,
207{
208 pub fn gather_rows(
213 &self,
214 field_ids: &[LogicalFieldId],
215 row_ids: &[u64],
216 policy: GatherNullPolicy,
217 ) -> Result<RecordBatch> {
218 let mut ctx = self.prepare_gather_context(field_ids)?;
219 self.execute_gather_single_pass(&mut ctx, row_ids, policy)
220 }
221
222 fn execute_gather_single_pass(
228 &self,
229 ctx: &mut MultiGatherContext,
230 row_ids: &[u64],
231 policy: GatherNullPolicy,
232 ) -> Result<RecordBatch> {
233 if ctx.is_empty() {
234 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
235 }
236
237 let field_infos = ctx.field_infos().to_vec();
238
239 if row_ids.is_empty() {
240 let mut arrays = Vec::with_capacity(field_infos.len());
241 let mut fields = Vec::with_capacity(field_infos.len());
242 for (fid, dtype) in &field_infos {
243 arrays.push(new_empty_array(dtype));
244 let field_name = format!("field_{}", u64::from(*fid));
245 fields.push(Field::new(field_name, dtype.clone(), true));
246 }
247 let schema = Arc::new(Schema::new(fields));
248 return RecordBatch::try_new(schema, arrays)
249 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
250 }
251
252 let mut row_index: FxHashMap<u64, usize> =
253 FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
254 for (idx, &row_id) in row_ids.iter().enumerate() {
255 if row_index.insert(row_id, idx).is_some() {
256 return Err(Error::Internal(
257 "duplicate row_id in gather_rows_multi".into(),
258 ));
259 }
260 }
261
262 let mut sorted_row_ids = row_ids.to_vec();
263 sorted_row_ids.sort_unstable();
264
265 let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
266 {
267 let plans_mut = ctx.plans_mut();
268 for plan in plans_mut.iter_mut() {
269 plan.candidate_indices.clear();
270 for (idx, meta) in plan.row_metas.iter().enumerate() {
271 if Self::chunk_intersects(&sorted_row_ids, meta) {
272 plan.candidate_indices.push(idx);
273 chunk_keys.insert(plan.value_metas[idx].chunk_pk);
274 chunk_keys.insert(plan.row_metas[idx].chunk_pk);
275 }
276 }
277 }
278 }
279
280 let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
281 for &key in &chunk_keys {
282 chunk_requests.push(BatchGet::Raw { key });
283 }
284
285 let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
286 FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
287 if !chunk_requests.is_empty() {
288 let chunk_results = self.pager.batch_get(&chunk_requests)?;
289 for result in chunk_results {
290 if let GetResult::Raw { key, bytes } = result {
291 chunk_map.insert(key, bytes);
292 }
293 }
294 }
295
296 let allow_missing = policy.allow_missing();
297
298 let mut outputs = Vec::with_capacity(ctx.plans().len());
299 for plan in ctx.plans() {
300 let array = match &plan.dtype {
301 DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
302 &row_index,
303 row_ids.len(),
304 plan,
305 &mut chunk_map,
306 allow_missing,
307 ),
308 DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
309 &row_index,
310 row_ids.len(),
311 plan,
312 &mut chunk_map,
313 allow_missing,
314 ),
315 DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
316 &row_index,
317 row_ids.len(),
318 plan,
319 &mut chunk_map,
320 allow_missing,
321 ),
322 DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
323 &row_index,
324 row_ids.len(),
325 plan,
326 &mut chunk_map,
327 allow_missing,
328 ),
329 DataType::Boolean => Self::gather_rows_single_shot_bool(
330 &row_index,
331 row_ids.len(),
332 plan,
333 &mut chunk_map,
334 allow_missing,
335 ),
336 DataType::Struct(_) => Self::gather_rows_single_shot_struct(
337 &row_index,
338 row_ids.len(),
339 plan,
340 &mut chunk_map,
341 allow_missing,
342 &plan.dtype,
343 ),
344 other => with_integer_arrow_type!(
345 other.clone(),
346 |ArrowTy| {
347 Self::gather_rows_single_shot::<ArrowTy>(
348 &row_index,
349 row_ids.len(),
350 plan,
351 &mut chunk_map,
352 allow_missing,
353 )
354 },
355 Err(Error::Internal(format!(
356 "gather_rows_multi: unsupported dtype {:?}",
357 other
358 ))),
359 ),
360 }?;
361 outputs.push(array);
362 }
363
364 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
365 Self::filter_rows_with_non_null(outputs)?
366 } else {
367 outputs
368 };
369
370 let mut fields = Vec::with_capacity(field_infos.len());
371 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
372 let array = &outputs[idx];
373 let field_name = format!("field_{}", u64::from(*fid));
374 let nullable = match policy {
375 GatherNullPolicy::IncludeNulls => true,
376 _ => array.null_count() > 0,
377 };
378 fields.push(Field::new(field_name, dtype.clone(), nullable));
379 }
380
381 let schema = Arc::new(Schema::new(fields));
382 RecordBatch::try_new(schema, outputs)
383 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
384 }
385
386 pub fn prepare_gather_context(
387 &self,
388 field_ids: &[LogicalFieldId],
389 ) -> Result<MultiGatherContext> {
390 let mut field_infos = Vec::with_capacity(field_ids.len());
391 for &fid in field_ids {
392 field_infos.push((fid, self.data_type(fid)?));
393 }
394
395 if field_infos.is_empty() {
396 return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
397 }
398
399 let catalog = self.catalog.read().unwrap();
400 let mut key_pairs = Vec::with_capacity(field_infos.len());
401 for (fid, _) in &field_infos {
402 let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
403 let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
404 key_pairs.push((value_pk, row_pk));
405 }
406 drop(catalog);
407
408 let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
409 for (value_pk, row_pk) in &key_pairs {
410 descriptor_requests.push(BatchGet::Raw { key: *value_pk });
411 descriptor_requests.push(BatchGet::Raw { key: *row_pk });
412 }
413 let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
414 let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
415 for result in descriptor_results {
416 if let GetResult::Raw { key, bytes } = result {
417 descriptor_map.insert(key, bytes);
418 }
419 }
420
421 let mut plans = Vec::with_capacity(field_infos.len());
422 for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
423 let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
424 let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
425 let value_metas =
426 Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
427
428 let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
429 let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
430 let row_metas =
431 Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
432
433 if value_metas.len() != row_metas.len() {
434 return Err(Error::Internal(
435 "gather_rows_multi: chunk count mismatch".into(),
436 ));
437 }
438
439 plans.push(FieldPlan {
440 dtype: dtype.clone(),
441 value_metas,
442 row_metas,
443 candidate_indices: Vec::new(),
444 });
445 }
446
447 Ok(MultiGatherContext::new(field_infos, plans))
448 }
449
450 pub fn gather_rows_with_reusable_context(
455 &self,
456 ctx: &mut MultiGatherContext,
457 row_ids: &[u64],
458 policy: GatherNullPolicy,
459 ) -> Result<RecordBatch> {
460 if ctx.is_empty() {
461 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
462 }
463
464 if row_ids.is_empty() {
465 let mut arrays = Vec::with_capacity(ctx.field_infos().len());
466 let mut fields = Vec::with_capacity(ctx.field_infos().len());
467 for (fid, dtype) in ctx.field_infos() {
468 arrays.push(new_empty_array(dtype));
469 let field_name = format!("field_{}", u64::from(*fid));
470 fields.push(Field::new(field_name, dtype.clone(), true));
471 }
472 let schema = Arc::new(Schema::new(fields));
473 return RecordBatch::try_new(schema, arrays)
474 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
475 }
476
477 let mut row_index = ctx.take_row_index();
478 let mut row_scratch = ctx.take_row_scratch();
479
480 let field_infos = ctx.field_infos().to_vec();
481 let mut chunk_keys = ctx.take_chunk_keys();
482
483 let result: Result<RecordBatch> = (|| {
484 let len = row_ids.len();
485 if row_scratch.len() < len {
486 row_scratch.resize(len, None);
487 }
488
489 let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
490 let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
491 Cow::Borrowed(row_ids)
492 } else {
493 let mut buf = row_ids.to_vec();
494 buf.sort_unstable();
495 Cow::Owned(buf)
496 };
497 let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
498
499 let dense_base = if len == 0 {
500 None
501 } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
502 {
503 Some(row_ids[0])
504 } else {
505 None
506 };
507
508 if dense_base.is_none() {
509 row_index.clear();
510 row_index.reserve(len);
511 for (idx, &row_id) in row_ids.iter().enumerate() {
512 if row_index.insert(row_id, idx).is_some() {
513 return Err(Error::Internal(
514 "duplicate row_id in gather_rows_multi".into(),
515 ));
516 }
517 }
518 } else {
519 row_index.clear();
520 }
521
522 let row_locator = if let Some(base) = dense_base {
523 RowLocator::Dense { base }
524 } else {
525 RowLocator::Sparse { index: &row_index }
526 };
527
528 chunk_keys.clear();
529
530 {
531 let plans_mut = ctx.plans_mut();
532 for plan in plans_mut.iter_mut() {
533 plan.candidate_indices.clear();
534 for (idx, meta) in plan.row_metas.iter().enumerate() {
535 if Self::chunk_intersects(sorted_row_ids, meta) {
536 plan.candidate_indices.push(idx);
537 chunk_keys.push(plan.value_metas[idx].chunk_pk);
538 chunk_keys.push(plan.row_metas[idx].chunk_pk);
539 }
540 }
541 }
542 }
543
544 chunk_keys.sort_unstable();
545 chunk_keys.dedup();
546
547 {
548 let mut pending: Vec<BatchGet> = Vec::new();
549 {
550 let cache = ctx.chunk_cache();
551 for &key in &chunk_keys {
552 if !cache.contains_key(&key) {
553 pending.push(BatchGet::Raw { key });
554 }
555 }
556 }
557
558 if !pending.is_empty() {
559 let chunk_results = self.pager.batch_get(&pending)?;
560 let cache = ctx.chunk_cache_mut();
561 for result in chunk_results {
562 if let GetResult::Raw { key, bytes } = result {
563 let array = deserialize_array(bytes)?;
564 cache.insert(key, Arc::clone(&array));
565 }
566 }
567 }
568 }
569
570 let allow_missing = policy.allow_missing();
571
572 let mut outputs = Vec::with_capacity(ctx.plans().len());
573 for plan in ctx.plans() {
574 let array = match &plan.dtype {
575 DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
576 row_ids,
577 row_locator,
578 len,
579 &plan.candidate_indices,
580 plan,
581 ctx.chunk_cache(),
582 &mut row_scratch,
583 allow_missing,
584 ),
585 DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
586 row_ids,
587 row_locator,
588 len,
589 &plan.candidate_indices,
590 plan,
591 ctx.chunk_cache(),
592 &mut row_scratch,
593 allow_missing,
594 ),
595 DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
596 row_ids,
597 row_locator,
598 len,
599 &plan.candidate_indices,
600 plan,
601 ctx.chunk_cache(),
602 &mut row_scratch,
603 allow_missing,
604 ),
605 DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
606 row_ids,
607 row_locator,
608 len,
609 &plan.candidate_indices,
610 plan,
611 ctx.chunk_cache(),
612 &mut row_scratch,
613 allow_missing,
614 ),
615 DataType::Boolean => Self::gather_rows_from_chunks_bool(
616 row_ids,
617 row_locator,
618 len,
619 &plan.candidate_indices,
620 plan,
621 ctx.chunk_cache(),
622 &mut row_scratch,
623 allow_missing,
624 ),
625 DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
626 row_locator,
627 len,
628 &plan.candidate_indices,
629 plan,
630 ctx.chunk_cache(),
631 &mut row_scratch,
632 allow_missing,
633 &plan.dtype,
634 ),
635 other => with_integer_arrow_type!(
636 other.clone(),
637 |ArrowTy| {
638 Self::gather_rows_from_chunks::<ArrowTy>(
639 row_ids,
640 row_locator,
641 len,
642 &plan.candidate_indices,
643 plan,
644 ctx.chunk_cache(),
645 &mut row_scratch,
646 allow_missing,
647 )
648 },
649 Err(Error::Internal(format!(
650 "gather_rows_multi: unsupported dtype {:?}",
651 other
652 ))),
653 ),
654 }?;
655 outputs.push(array);
656 }
657
658 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
659 Self::filter_rows_with_non_null(outputs)?
660 } else {
661 outputs
662 };
663
664 let mut fields = Vec::with_capacity(field_infos.len());
665 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
666 let array = &outputs[idx];
667 let field_name = format!("field_{}", u64::from(*fid));
668 let nullable = match policy {
669 GatherNullPolicy::IncludeNulls => true,
670 _ => array.null_count() > 0,
671 };
672 fields.push(Field::new(field_name, dtype.clone(), nullable));
673 }
674
675 let schema = Arc::new(Schema::new(fields));
676 RecordBatch::try_new(schema, outputs)
677 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
678 })();
679
680 ctx.store_row_scratch(row_scratch);
681 ctx.store_row_index(row_index);
682 ctx.store_chunk_keys(chunk_keys);
683
684 result
685 }
686
687 fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
688 let mut metas = Vec::new();
689 if head_page_pk == 0 {
690 return Ok(metas);
691 }
692 for meta in DescriptorIterator::new(pager, head_page_pk) {
693 let meta = meta?;
694 if meta.row_count > 0 {
695 metas.push(meta);
696 }
697 }
698 Ok(metas)
699 }
700
701 #[inline]
702 fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
703 if sorted_row_ids.is_empty() || meta.row_count == 0 {
704 return false;
705 }
706 let min = meta.min_val_u64;
707 let max = meta.max_val_u64;
708 if min == 0 && max == 0 && meta.row_count > 0 {
709 return true;
710 }
711 if min > max {
712 return true;
713 }
714 let min_req = sorted_row_ids[0];
715 let max_req = *sorted_row_ids.last().unwrap();
716 if max < min_req || min > max_req {
717 return false;
718 }
719 let idx = sorted_row_ids.partition_point(|&rid| rid < min);
720 idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
721 }
722
723 fn gather_rows_single_shot_string<O>(
724 row_index: &FxHashMap<u64, usize>,
725 len: usize,
726 plan: &FieldPlan,
727 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
728 allow_missing: bool,
729 ) -> Result<ArrayRef>
730 where
731 O: OffsetSizeTrait,
732 {
733 shared_gather_rows_single_shot_string::<O>(
734 row_index,
735 len,
736 &plan.value_metas,
737 &plan.row_metas,
738 &plan.candidate_indices,
739 chunk_blobs,
740 allow_missing,
741 )
742 }
743
744 #[allow(clippy::too_many_arguments)]
745 fn gather_rows_single_shot_bool(
746 row_index: &FxHashMap<u64, usize>,
747 len: usize,
748 plan: &FieldPlan,
749 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
750 allow_missing: bool,
751 ) -> Result<ArrayRef> {
752 shared_gather_rows_single_shot_bool(
753 row_index,
754 len,
755 &plan.value_metas,
756 &plan.row_metas,
757 &plan.candidate_indices,
758 chunk_blobs,
759 allow_missing,
760 )
761 }
762
763 fn gather_rows_single_shot_struct(
764 row_index: &FxHashMap<u64, usize>,
765 len: usize,
766 plan: &FieldPlan,
767 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
768 allow_missing: bool,
769 dtype: &DataType,
770 ) -> Result<ArrayRef> {
771 shared_gather_rows_single_shot_struct(
772 row_index,
773 len,
774 &plan.value_metas,
775 &plan.row_metas,
776 &plan.candidate_indices,
777 chunk_blobs,
778 allow_missing,
779 dtype,
780 )
781 }
782
783 #[allow(clippy::too_many_arguments)]
784 fn gather_rows_single_shot<T>(
785 row_index: &FxHashMap<u64, usize>,
786 len: usize,
787 plan: &FieldPlan,
788 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
789 allow_missing: bool,
790 ) -> Result<ArrayRef>
791 where
792 T: ArrowPrimitiveType,
793 {
794 shared_gather_rows_single_shot::<T>(
795 row_index,
796 len,
797 &plan.value_metas,
798 &plan.row_metas,
799 &plan.candidate_indices,
800 chunk_blobs,
801 allow_missing,
802 )
803 }
804
805 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_string<O>(
807 row_ids: &[u64],
808 row_locator: RowLocator,
809 len: usize,
810 candidate_indices: &[usize],
811 plan: &FieldPlan,
812 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
813 row_scratch: &mut [Option<(usize, usize)>],
814 allow_missing: bool,
815 ) -> Result<ArrayRef>
816 where
817 O: OffsetSizeTrait,
818 {
819 shared_gather_rows_from_chunks_string::<O>(
820 row_ids,
821 row_locator,
822 len,
823 candidate_indices,
824 &plan.value_metas,
825 &plan.row_metas,
826 chunk_arrays,
827 row_scratch,
828 allow_missing,
829 )
830 }
831
832 fn gather_rows_single_shot_binary<O>(
833 row_index: &FxHashMap<u64, usize>,
834 len: usize,
835 plan: &FieldPlan,
836 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
837 allow_missing: bool,
838 ) -> Result<ArrayRef>
839 where
840 O: OffsetSizeTrait,
841 {
842 shared_gather_rows_single_shot_binary::<O>(
843 row_index,
844 len,
845 &plan.value_metas,
846 &plan.row_metas,
847 &plan.candidate_indices,
848 chunk_blobs,
849 allow_missing,
850 )
851 }
852
853 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_binary<O>(
855 row_ids: &[u64],
856 row_locator: RowLocator,
857 len: usize,
858 candidate_indices: &[usize],
859 plan: &FieldPlan,
860 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
861 row_scratch: &mut [Option<(usize, usize)>],
862 allow_missing: bool,
863 ) -> Result<ArrayRef>
864 where
865 O: OffsetSizeTrait,
866 {
867 shared_gather_rows_from_chunks_binary::<O>(
868 row_ids,
869 row_locator,
870 len,
871 candidate_indices,
872 &plan.value_metas,
873 &plan.row_metas,
874 chunk_arrays,
875 row_scratch,
876 allow_missing,
877 )
878 }
879
880 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_bool(
882 row_ids: &[u64],
883 row_locator: RowLocator,
884 len: usize,
885 candidate_indices: &[usize],
886 plan: &FieldPlan,
887 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
888 row_scratch: &mut [Option<(usize, usize)>],
889 allow_missing: bool,
890 ) -> Result<ArrayRef> {
891 shared_gather_rows_from_chunks_bool(
892 row_ids,
893 row_locator,
894 len,
895 candidate_indices,
896 &plan.value_metas,
897 &plan.row_metas,
898 chunk_arrays,
899 row_scratch,
900 allow_missing,
901 )
902 }
903
904 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_struct(
906 row_locator: RowLocator,
907 len: usize,
908 candidate_indices: &[usize],
909 plan: &FieldPlan,
910 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
911 row_scratch: &mut [Option<(usize, usize)>],
912 allow_missing: bool,
913 dtype: &DataType,
914 ) -> Result<ArrayRef> {
915 shared_gather_rows_from_chunks_struct(
916 row_locator,
917 len,
918 candidate_indices,
919 &plan.value_metas,
920 &plan.row_metas,
921 chunk_arrays,
922 row_scratch,
923 allow_missing,
924 dtype,
925 )
926 }
927
928 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks<T>(
930 row_ids: &[u64],
931 row_locator: RowLocator,
932 len: usize,
933 candidate_indices: &[usize],
934 plan: &FieldPlan,
935 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
936 row_scratch: &mut [Option<(usize, usize)>],
937 allow_missing: bool,
938 ) -> Result<ArrayRef>
939 where
940 T: ArrowPrimitiveType,
941 {
942 shared_gather_rows_from_chunks::<T>(
943 row_ids,
944 row_locator,
945 len,
946 candidate_indices,
947 &plan.value_metas,
948 &plan.row_metas,
949 chunk_arrays,
950 row_scratch,
951 allow_missing,
952 )
953 }
954
955 fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
956 shared_filter_rows_with_non_null(columns)
957 }
958}