1use super::*;
8use crate::gather::{
9 RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10 gather_rows_from_chunks as shared_gather_rows_from_chunks,
11 gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12 gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13 gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
14 gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
15 gather_rows_single_shot as shared_gather_rows_single_shot,
16 gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
17 gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
18 gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
19 gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
20};
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use crate::types::{LogicalFieldId, RowId};
23use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
24use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
25use arrow::record_batch::RecordBatch;
26use llkv_result::{Error, Result};
27use llkv_storage::{
28 pager::{BatchGet, GetResult, Pager},
29 serialization::deserialize_array,
30 types::PhysicalKey,
31};
32use rustc_hash::{FxHashMap, FxHashSet};
33use simd_r_drive_entry_handle::EntryHandle;
34use std::borrow::Cow;
35use std::sync::Arc;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38pub enum GatherNullPolicy {
39 ErrorOnMissing,
41 IncludeNulls,
43 DropNulls,
45}
46
47#[derive(Clone, Debug, Default, Eq, PartialEq)]
48pub struct Projection {
49 pub logical_field_id: LogicalFieldId,
50 pub alias: Option<String>,
51}
52
53impl Projection {
54 pub fn new(logical_field_id: LogicalFieldId) -> Self {
55 Self {
56 logical_field_id,
57 alias: None,
58 }
59 }
60
61 pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
62 Self {
63 logical_field_id,
64 alias: Some(alias.into()),
65 }
66 }
67}
68
69impl From<LogicalFieldId> for Projection {
70 fn from(logical_field_id: LogicalFieldId) -> Self {
71 Projection::new(logical_field_id)
72 }
73}
74
75impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
76 fn from(value: (LogicalFieldId, S)) -> Self {
77 Projection::with_alias(value.0, value.1)
78 }
79}
80
81impl GatherNullPolicy {
82 #[inline]
83 fn allow_missing(self) -> bool {
84 !matches!(self, Self::ErrorOnMissing)
85 }
86}
87
88pub struct MultiGatherContext {
89 field_infos: Vec<(LogicalFieldId, DataType)>,
90 plans: Vec<FieldPlan>,
91 chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
92 row_index: FxHashMap<u64, usize>,
93 row_scratch: Vec<Option<(usize, usize)>>,
94 chunk_keys: Vec<PhysicalKey>,
95}
96
97impl MultiGatherContext {
98 fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
99 Self {
100 chunk_cache: FxHashMap::default(),
101 row_index: FxHashMap::default(),
102 row_scratch: Vec::new(),
103 chunk_keys: Vec::new(),
104 field_infos,
105 plans,
106 }
107 }
108
109 #[inline]
110 pub fn is_empty(&self) -> bool {
111 self.plans.is_empty()
112 }
113
114 #[inline]
115 fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
116 &self.field_infos
117 }
118
119 #[inline]
120 fn plans(&self) -> &[FieldPlan] {
121 &self.plans
122 }
123
124 #[inline]
125 fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
126 &self.chunk_cache
127 }
128
129 #[inline]
130 fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
131 &mut self.chunk_cache
132 }
133
134 #[inline]
135 fn plans_mut(&mut self) -> &mut [FieldPlan] {
136 &mut self.plans
137 }
138
139 fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
140 std::mem::take(&mut self.chunk_keys)
141 }
142
143 fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
144 self.chunk_keys = keys;
145 }
146
147 fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
148 std::mem::take(&mut self.row_index)
149 }
150
151 fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
152 self.row_index = row_index;
153 }
154
155 fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
156 std::mem::take(&mut self.row_scratch)
157 }
158
159 fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
160 self.row_scratch = scratch;
161 }
162
163 pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
164 let first_plan = self.plans.first()?;
165 let mut chunk_idx = None;
166 for (idx, meta) in first_plan.row_metas.iter().enumerate() {
167 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
168 chunk_idx = Some(idx);
169 break;
170 }
171 }
172 if chunk_idx.is_none() {
173 let total_chunks = first_plan.row_metas.len();
174 'outer: for idx in 0..total_chunks {
175 for plan in &self.plans {
176 let meta = &plan.row_metas[idx];
177 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
178 chunk_idx = Some(idx);
179 break 'outer;
180 }
181 }
182 }
183 }
184 let idx = chunk_idx?;
185
186 let mut span_min = u64::MAX;
187 let mut span_max = 0u64;
188 for plan in &self.plans {
189 let meta = plan.row_metas.get(idx)?;
190 span_min = span_min.min(meta.min_val_u64);
191 span_max = span_max.max(meta.max_val_u64);
192 }
193
194 if span_min > span_max {
195 return None;
196 }
197
198 Some((idx, span_min, span_max))
199 }
200}
201
202#[derive(Clone, Debug)]
203struct FieldPlan {
204 dtype: DataType,
205 value_metas: Vec<ChunkMetadata>,
206 row_metas: Vec<ChunkMetadata>,
207 candidate_indices: Vec<usize>,
208}
209
210impl<P> ColumnStore<P>
211where
212 P: Pager<Blob = EntryHandle> + Send + Sync,
213{
214 pub fn gather_rows(
219 &self,
220 field_ids: &[LogicalFieldId],
221 row_ids: &[u64],
222 policy: GatherNullPolicy,
223 ) -> Result<RecordBatch> {
224 let mut ctx = self.prepare_gather_context(field_ids)?;
225 self.execute_gather_single_pass(&mut ctx, row_ids, policy)
226 }
227
228 fn execute_gather_single_pass(
234 &self,
235 ctx: &mut MultiGatherContext,
236 row_ids: &[u64],
237 policy: GatherNullPolicy,
238 ) -> Result<RecordBatch> {
239 if ctx.is_empty() {
240 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
241 }
242
243 let field_infos = ctx.field_infos().to_vec();
244
245 if row_ids.is_empty() {
246 let mut arrays = Vec::with_capacity(field_infos.len());
247 let mut fields = Vec::with_capacity(field_infos.len());
248 for (fid, dtype) in &field_infos {
249 arrays.push(new_empty_array(dtype));
250 let field_name = format!("field_{}", u64::from(*fid));
251 fields.push(Field::new(field_name, dtype.clone(), true));
252 }
253 let schema = Arc::new(Schema::new(fields));
254 return RecordBatch::try_new(schema, arrays)
255 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
256 }
257
258 let mut row_index: FxHashMap<u64, usize> =
259 FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
260 for (idx, &row_id) in row_ids.iter().enumerate() {
261 if row_index.insert(row_id, idx).is_some() {
262 return Err(Error::Internal(
263 "duplicate row_id in gather_rows_multi".into(),
264 ));
265 }
266 }
267
268 let mut sorted_row_ids = row_ids.to_vec();
269 sorted_row_ids.sort_unstable();
270
271 let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
272 {
273 let plans_mut = ctx.plans_mut();
274 for plan in plans_mut.iter_mut() {
275 plan.candidate_indices.clear();
276 for (idx, meta) in plan.row_metas.iter().enumerate() {
277 if Self::chunk_intersects(&sorted_row_ids, meta) {
278 plan.candidate_indices.push(idx);
279 chunk_keys.insert(plan.value_metas[idx].chunk_pk);
280 chunk_keys.insert(plan.row_metas[idx].chunk_pk);
281 }
282 }
283 }
284 }
285
286 let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
287 for &key in &chunk_keys {
288 chunk_requests.push(BatchGet::Raw { key });
289 }
290
291 let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
292 FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
293 if !chunk_requests.is_empty() {
294 let chunk_results = self.pager.batch_get(&chunk_requests)?;
295 for result in chunk_results {
296 if let GetResult::Raw { key, bytes } = result {
297 chunk_map.insert(key, bytes);
298 }
299 }
300 }
301
302 let allow_missing = policy.allow_missing();
303
304 let mut outputs = Vec::with_capacity(ctx.plans().len());
305 for plan in ctx.plans() {
306 let array = match &plan.dtype {
307 DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
308 &row_index,
309 row_ids.len(),
310 plan,
311 &mut chunk_map,
312 allow_missing,
313 ),
314 DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
315 &row_index,
316 row_ids.len(),
317 plan,
318 &mut chunk_map,
319 allow_missing,
320 ),
321 DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
322 &row_index,
323 row_ids.len(),
324 plan,
325 &mut chunk_map,
326 allow_missing,
327 ),
328 DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
329 &row_index,
330 row_ids.len(),
331 plan,
332 &mut chunk_map,
333 allow_missing,
334 ),
335 DataType::Boolean => Self::gather_rows_single_shot_bool(
336 &row_index,
337 row_ids.len(),
338 plan,
339 &mut chunk_map,
340 allow_missing,
341 ),
342 DataType::Struct(_) => Self::gather_rows_single_shot_struct(
343 &row_index,
344 row_ids.len(),
345 plan,
346 &mut chunk_map,
347 allow_missing,
348 &plan.dtype,
349 ),
350 other => with_integer_arrow_type!(
351 other.clone(),
352 |ArrowTy| {
353 Self::gather_rows_single_shot::<ArrowTy>(
354 &row_index,
355 row_ids.len(),
356 plan,
357 &mut chunk_map,
358 allow_missing,
359 )
360 },
361 Err(Error::Internal(format!(
362 "gather_rows_multi: unsupported dtype {:?}",
363 other
364 ))),
365 ),
366 }?;
367 outputs.push(array);
368 }
369
370 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
371 Self::filter_rows_with_non_null(outputs)?
372 } else {
373 outputs
374 };
375
376 let mut fields = Vec::with_capacity(field_infos.len());
377 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
378 let array = &outputs[idx];
379 let field_name = format!("field_{}", u64::from(*fid));
380 let nullable = match policy {
381 GatherNullPolicy::IncludeNulls => true,
382 _ => array.null_count() > 0,
383 };
384 fields.push(Field::new(field_name, dtype.clone(), nullable));
385 }
386
387 let schema = Arc::new(Schema::new(fields));
388 RecordBatch::try_new(schema, outputs)
389 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
390 }
391
392 pub fn prepare_gather_context(
393 &self,
394 field_ids: &[LogicalFieldId],
395 ) -> Result<MultiGatherContext> {
396 let mut field_infos = Vec::with_capacity(field_ids.len());
397 for &fid in field_ids {
398 field_infos.push((fid, self.data_type(fid)?));
399 }
400
401 if field_infos.is_empty() {
402 return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
403 }
404
405 let catalog = self.catalog.read().unwrap();
406 let mut key_pairs = Vec::with_capacity(field_infos.len());
407 for (fid, _) in &field_infos {
408 let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
409 let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
410 key_pairs.push((value_pk, row_pk));
411 }
412 drop(catalog);
413
414 let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
415 for (value_pk, row_pk) in &key_pairs {
416 descriptor_requests.push(BatchGet::Raw { key: *value_pk });
417 descriptor_requests.push(BatchGet::Raw { key: *row_pk });
418 }
419 let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
420 let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
421 for result in descriptor_results {
422 if let GetResult::Raw { key, bytes } = result {
423 descriptor_map.insert(key, bytes);
424 }
425 }
426
427 let mut plans = Vec::with_capacity(field_infos.len());
428 for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
429 let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
430 let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
431 let value_metas =
432 Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
433
434 let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
435 let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
436 let row_metas =
437 Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
438
439 if value_metas.len() != row_metas.len() {
440 return Err(Error::Internal(
441 "gather_rows_multi: chunk count mismatch".into(),
442 ));
443 }
444
445 plans.push(FieldPlan {
446 dtype: dtype.clone(),
447 value_metas,
448 row_metas,
449 candidate_indices: Vec::new(),
450 });
451 }
452
453 Ok(MultiGatherContext::new(field_infos, plans))
454 }
455
456 pub fn gather_rows_with_reusable_context(
461 &self,
462 ctx: &mut MultiGatherContext,
463 row_ids: &[u64],
464 policy: GatherNullPolicy,
465 ) -> Result<RecordBatch> {
466 if ctx.is_empty() {
467 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
468 }
469
470 if row_ids.is_empty() {
471 let mut arrays = Vec::with_capacity(ctx.field_infos().len());
472 let mut fields = Vec::with_capacity(ctx.field_infos().len());
473 for (fid, dtype) in ctx.field_infos() {
474 arrays.push(new_empty_array(dtype));
475 let field_name = format!("field_{}", u64::from(*fid));
476 fields.push(Field::new(field_name, dtype.clone(), true));
477 }
478 let schema = Arc::new(Schema::new(fields));
479 return RecordBatch::try_new(schema, arrays)
480 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
481 }
482
483 let mut row_index = ctx.take_row_index();
484 let mut row_scratch = ctx.take_row_scratch();
485
486 let field_infos = ctx.field_infos().to_vec();
487 let mut chunk_keys = ctx.take_chunk_keys();
488
489 let result: Result<RecordBatch> = (|| {
490 let len = row_ids.len();
491 if row_scratch.len() < len {
492 row_scratch.resize(len, None);
493 }
494
495 let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
496 let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
497 Cow::Borrowed(row_ids)
498 } else {
499 let mut buf = row_ids.to_vec();
500 buf.sort_unstable();
501 Cow::Owned(buf)
502 };
503 let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
504
505 let dense_base = if len == 0 {
506 None
507 } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
508 {
509 Some(row_ids[0])
510 } else {
511 None
512 };
513
514 if dense_base.is_none() {
515 row_index.clear();
516 row_index.reserve(len);
517 for (idx, &row_id) in row_ids.iter().enumerate() {
518 if row_index.insert(row_id, idx).is_some() {
519 return Err(Error::Internal(
520 "duplicate row_id in gather_rows_multi".into(),
521 ));
522 }
523 }
524 } else {
525 row_index.clear();
526 }
527
528 let row_locator = if let Some(base) = dense_base {
529 RowLocator::Dense { base }
530 } else {
531 RowLocator::Sparse { index: &row_index }
532 };
533
534 chunk_keys.clear();
535
536 {
537 let plans_mut = ctx.plans_mut();
538 for plan in plans_mut.iter_mut() {
539 plan.candidate_indices.clear();
540 for (idx, meta) in plan.row_metas.iter().enumerate() {
541 if Self::chunk_intersects(sorted_row_ids, meta) {
542 plan.candidate_indices.push(idx);
543 chunk_keys.push(plan.value_metas[idx].chunk_pk);
544 chunk_keys.push(plan.row_metas[idx].chunk_pk);
545 }
546 }
547 }
548 }
549
550 chunk_keys.sort_unstable();
551 chunk_keys.dedup();
552
553 {
554 let mut pending: Vec<BatchGet> = Vec::new();
555 {
556 let cache = ctx.chunk_cache();
557 for &key in &chunk_keys {
558 if !cache.contains_key(&key) {
559 pending.push(BatchGet::Raw { key });
560 }
561 }
562 }
563
564 if !pending.is_empty() {
565 let chunk_results = self.pager.batch_get(&pending)?;
566 let cache = ctx.chunk_cache_mut();
567 for result in chunk_results {
568 if let GetResult::Raw { key, bytes } = result {
569 let array = deserialize_array(bytes)?;
570 cache.insert(key, Arc::clone(&array));
571 }
572 }
573 }
574 }
575
576 let allow_missing = policy.allow_missing();
577
578 let mut outputs = Vec::with_capacity(ctx.plans().len());
579 for plan in ctx.plans() {
580 let array = match &plan.dtype {
581 DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
582 row_ids,
583 row_locator,
584 len,
585 &plan.candidate_indices,
586 plan,
587 ctx.chunk_cache(),
588 &mut row_scratch,
589 allow_missing,
590 ),
591 DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
592 row_ids,
593 row_locator,
594 len,
595 &plan.candidate_indices,
596 plan,
597 ctx.chunk_cache(),
598 &mut row_scratch,
599 allow_missing,
600 ),
601 DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
602 row_ids,
603 row_locator,
604 len,
605 &plan.candidate_indices,
606 plan,
607 ctx.chunk_cache(),
608 &mut row_scratch,
609 allow_missing,
610 ),
611 DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
612 row_ids,
613 row_locator,
614 len,
615 &plan.candidate_indices,
616 plan,
617 ctx.chunk_cache(),
618 &mut row_scratch,
619 allow_missing,
620 ),
621 DataType::Boolean => Self::gather_rows_from_chunks_bool(
622 row_ids,
623 row_locator,
624 len,
625 &plan.candidate_indices,
626 plan,
627 ctx.chunk_cache(),
628 &mut row_scratch,
629 allow_missing,
630 ),
631 DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
632 row_locator,
633 len,
634 &plan.candidate_indices,
635 plan,
636 ctx.chunk_cache(),
637 &mut row_scratch,
638 allow_missing,
639 &plan.dtype,
640 ),
641 other => with_integer_arrow_type!(
642 other.clone(),
643 |ArrowTy| {
644 Self::gather_rows_from_chunks::<ArrowTy>(
645 row_ids,
646 row_locator,
647 len,
648 &plan.candidate_indices,
649 plan,
650 ctx.chunk_cache(),
651 &mut row_scratch,
652 allow_missing,
653 )
654 },
655 Err(Error::Internal(format!(
656 "gather_rows_multi: unsupported dtype {:?}",
657 other
658 ))),
659 ),
660 }?;
661 outputs.push(array);
662 }
663
664 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
665 Self::filter_rows_with_non_null(outputs)?
666 } else {
667 outputs
668 };
669
670 let mut fields = Vec::with_capacity(field_infos.len());
671 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
672 let array = &outputs[idx];
673 let field_name = format!("field_{}", u64::from(*fid));
674 let nullable = match policy {
675 GatherNullPolicy::IncludeNulls => true,
676 _ => array.null_count() > 0,
677 };
678 fields.push(Field::new(field_name, dtype.clone(), nullable));
679 }
680
681 let schema = Arc::new(Schema::new(fields));
682 RecordBatch::try_new(schema, outputs)
683 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
684 })();
685
686 ctx.store_row_scratch(row_scratch);
687 ctx.store_row_index(row_index);
688 ctx.store_chunk_keys(chunk_keys);
689
690 result
691 }
692
693 fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
694 let mut metas = Vec::new();
695 if head_page_pk == 0 {
696 return Ok(metas);
697 }
698 for meta in DescriptorIterator::new(pager, head_page_pk) {
699 let meta = meta?;
700 if meta.row_count > 0 {
701 metas.push(meta);
702 }
703 }
704 Ok(metas)
705 }
706
707 #[inline]
708 fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
709 if sorted_row_ids.is_empty() || meta.row_count == 0 {
710 return false;
711 }
712 let min = meta.min_val_u64;
713 let max = meta.max_val_u64;
714 if min == 0 && max == 0 && meta.row_count > 0 {
715 return true;
716 }
717 if min > max {
718 return true;
719 }
720 let min_req = sorted_row_ids[0];
721 let max_req = *sorted_row_ids.last().unwrap();
722 if max < min_req || min > max_req {
723 return false;
724 }
725 let idx = sorted_row_ids.partition_point(|&rid| rid < min);
726 idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
727 }
728
729 fn gather_rows_single_shot_string<O>(
730 row_index: &FxHashMap<u64, usize>,
731 len: usize,
732 plan: &FieldPlan,
733 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
734 allow_missing: bool,
735 ) -> Result<ArrayRef>
736 where
737 O: OffsetSizeTrait,
738 {
739 shared_gather_rows_single_shot_string::<O>(
740 row_index,
741 len,
742 &plan.value_metas,
743 &plan.row_metas,
744 &plan.candidate_indices,
745 chunk_blobs,
746 allow_missing,
747 )
748 }
749
750 #[allow(clippy::too_many_arguments)]
751 fn gather_rows_single_shot_bool(
752 row_index: &FxHashMap<u64, usize>,
753 len: usize,
754 plan: &FieldPlan,
755 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
756 allow_missing: bool,
757 ) -> Result<ArrayRef> {
758 shared_gather_rows_single_shot_bool(
759 row_index,
760 len,
761 &plan.value_metas,
762 &plan.row_metas,
763 &plan.candidate_indices,
764 chunk_blobs,
765 allow_missing,
766 )
767 }
768
769 fn gather_rows_single_shot_struct(
770 row_index: &FxHashMap<u64, usize>,
771 len: usize,
772 plan: &FieldPlan,
773 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
774 allow_missing: bool,
775 dtype: &DataType,
776 ) -> Result<ArrayRef> {
777 shared_gather_rows_single_shot_struct(
778 row_index,
779 len,
780 &plan.value_metas,
781 &plan.row_metas,
782 &plan.candidate_indices,
783 chunk_blobs,
784 allow_missing,
785 dtype,
786 )
787 }
788
789 #[allow(clippy::too_many_arguments)]
790 fn gather_rows_single_shot<T>(
791 row_index: &FxHashMap<u64, usize>,
792 len: usize,
793 plan: &FieldPlan,
794 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
795 allow_missing: bool,
796 ) -> Result<ArrayRef>
797 where
798 T: ArrowPrimitiveType,
799 {
800 shared_gather_rows_single_shot::<T>(
801 row_index,
802 len,
803 &plan.value_metas,
804 &plan.row_metas,
805 &plan.candidate_indices,
806 chunk_blobs,
807 allow_missing,
808 )
809 }
810
811 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_string<O>(
813 row_ids: &[u64],
814 row_locator: RowLocator,
815 len: usize,
816 candidate_indices: &[usize],
817 plan: &FieldPlan,
818 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
819 row_scratch: &mut [Option<(usize, usize)>],
820 allow_missing: bool,
821 ) -> Result<ArrayRef>
822 where
823 O: OffsetSizeTrait,
824 {
825 shared_gather_rows_from_chunks_string::<O>(
826 row_ids,
827 row_locator,
828 len,
829 candidate_indices,
830 &plan.value_metas,
831 &plan.row_metas,
832 chunk_arrays,
833 row_scratch,
834 allow_missing,
835 )
836 }
837
838 fn gather_rows_single_shot_binary<O>(
839 row_index: &FxHashMap<u64, usize>,
840 len: usize,
841 plan: &FieldPlan,
842 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
843 allow_missing: bool,
844 ) -> Result<ArrayRef>
845 where
846 O: OffsetSizeTrait,
847 {
848 shared_gather_rows_single_shot_binary::<O>(
849 row_index,
850 len,
851 &plan.value_metas,
852 &plan.row_metas,
853 &plan.candidate_indices,
854 chunk_blobs,
855 allow_missing,
856 )
857 }
858
859 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_binary<O>(
861 row_ids: &[u64],
862 row_locator: RowLocator,
863 len: usize,
864 candidate_indices: &[usize],
865 plan: &FieldPlan,
866 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
867 row_scratch: &mut [Option<(usize, usize)>],
868 allow_missing: bool,
869 ) -> Result<ArrayRef>
870 where
871 O: OffsetSizeTrait,
872 {
873 shared_gather_rows_from_chunks_binary::<O>(
874 row_ids,
875 row_locator,
876 len,
877 candidate_indices,
878 &plan.value_metas,
879 &plan.row_metas,
880 chunk_arrays,
881 row_scratch,
882 allow_missing,
883 )
884 }
885
886 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_bool(
888 row_ids: &[u64],
889 row_locator: RowLocator,
890 len: usize,
891 candidate_indices: &[usize],
892 plan: &FieldPlan,
893 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
894 row_scratch: &mut [Option<(usize, usize)>],
895 allow_missing: bool,
896 ) -> Result<ArrayRef> {
897 shared_gather_rows_from_chunks_bool(
898 row_ids,
899 row_locator,
900 len,
901 candidate_indices,
902 &plan.value_metas,
903 &plan.row_metas,
904 chunk_arrays,
905 row_scratch,
906 allow_missing,
907 )
908 }
909
910 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_struct(
912 row_locator: RowLocator,
913 len: usize,
914 candidate_indices: &[usize],
915 plan: &FieldPlan,
916 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
917 row_scratch: &mut [Option<(usize, usize)>],
918 allow_missing: bool,
919 dtype: &DataType,
920 ) -> Result<ArrayRef> {
921 shared_gather_rows_from_chunks_struct(
922 row_locator,
923 len,
924 candidate_indices,
925 &plan.value_metas,
926 &plan.row_metas,
927 chunk_arrays,
928 row_scratch,
929 allow_missing,
930 dtype,
931 )
932 }
933
934 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks<T>(
936 row_ids: &[u64],
937 row_locator: RowLocator,
938 len: usize,
939 candidate_indices: &[usize],
940 plan: &FieldPlan,
941 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
942 row_scratch: &mut [Option<(usize, usize)>],
943 allow_missing: bool,
944 ) -> Result<ArrayRef>
945 where
946 T: ArrowPrimitiveType,
947 {
948 shared_gather_rows_from_chunks::<T>(
949 row_ids,
950 row_locator,
951 len,
952 candidate_indices,
953 &plan.value_metas,
954 &plan.row_metas,
955 chunk_arrays,
956 row_scratch,
957 allow_missing,
958 )
959 }
960
961 fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
962 shared_filter_rows_with_non_null(columns)
963 }
964}