1use super::*;
8use crate::gather::{
9 RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10 gather_rows_from_chunks as shared_gather_rows_from_chunks,
11 gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12 gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13 gather_rows_from_chunks_decimal128 as shared_gather_rows_from_chunks_decimal128,
14 gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
15 gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
16 gather_rows_single_shot as shared_gather_rows_single_shot,
17 gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
18 gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
19 gather_rows_single_shot_decimal128 as shared_gather_rows_single_shot_decimal128,
20 gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
21 gather_rows_single_shot_string_view as shared_gather_rows_single_shot_string_view,
22 gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
23};
24use crate::serialization::deserialize_array;
25use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
26use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
27use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
28use arrow::record_batch::RecordBatch;
29use llkv_result::{Error, Result};
30use llkv_storage::{
31 pager::{BatchGet, GetResult, Pager},
32 types::PhysicalKey,
33};
34use llkv_types::ids::{LogicalFieldId, RowId};
35use rustc_hash::{FxHashMap, FxHashSet};
36use simd_r_drive_entry_handle::EntryHandle;
37use std::borrow::Cow;
38use std::sync::Arc;
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41pub enum GatherNullPolicy {
42 ErrorOnMissing,
44 IncludeNulls,
46 DropNulls,
48}
49
50#[derive(Clone, Debug, Default, Eq, PartialEq)]
55pub struct Projection {
56 pub logical_field_id: LogicalFieldId,
57 pub alias: Option<String>,
58}
59
60impl Projection {
61 pub fn new(logical_field_id: LogicalFieldId) -> Self {
62 Self {
63 logical_field_id,
64 alias: None,
65 }
66 }
67
68 pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
69 Self {
70 logical_field_id,
71 alias: Some(alias.into()),
72 }
73 }
74}
75
76impl From<LogicalFieldId> for Projection {
77 fn from(logical_field_id: LogicalFieldId) -> Self {
78 Projection::new(logical_field_id)
79 }
80}
81
82impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
83 fn from(value: (LogicalFieldId, S)) -> Self {
84 Projection::with_alias(value.0, value.1)
85 }
86}
87
88impl GatherNullPolicy {
89 #[inline]
90 fn allow_missing(self) -> bool {
91 !matches!(self, Self::ErrorOnMissing)
92 }
93}
94
95pub struct MultiGatherContext {
100 field_infos: Vec<(LogicalFieldId, DataType)>,
101 fields: Vec<Field>,
102 plans: Vec<FieldPlan>,
103 chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
104 row_index: FxHashMap<u64, usize>,
105 row_scratch: Vec<Option<(usize, usize)>>,
106 chunk_keys: Vec<PhysicalKey>,
107}
108
109impl MultiGatherContext {
110 fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
111 let fields: Vec<Field> = field_infos
114 .iter()
115 .map(|(fid, dtype)| {
116 let field_name = format!("field_{}", u64::from(*fid));
117 Field::new(field_name, dtype.clone(), true)
118 })
119 .collect();
120
121 Self {
122 chunk_cache: FxHashMap::default(),
123 row_index: FxHashMap::default(),
124 row_scratch: Vec::new(),
125 chunk_keys: Vec::new(),
126 field_infos,
127 fields,
128 plans,
129 }
130 }
131
132 #[inline]
133 pub fn is_empty(&self) -> bool {
134 self.plans.is_empty()
135 }
136
137 #[inline]
138 fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
139 &self.field_infos
140 }
141
142 #[inline]
143 fn fields(&self) -> &[Field] {
144 &self.fields
145 }
146
147 #[inline]
148 fn plans(&self) -> &[FieldPlan] {
149 &self.plans
150 }
151
152 #[inline]
153 fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
154 &self.chunk_cache
155 }
156
157 #[inline]
158 fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
159 &mut self.chunk_cache
160 }
161
162 #[inline]
163 fn plans_mut(&mut self) -> &mut [FieldPlan] {
164 &mut self.plans
165 }
166
167 fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
168 std::mem::take(&mut self.chunk_keys)
169 }
170
171 fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
172 self.chunk_keys = keys;
173 }
174
175 fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
176 std::mem::take(&mut self.row_index)
177 }
178
179 fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
180 self.row_index = row_index;
181 }
182
183 fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
184 std::mem::take(&mut self.row_scratch)
185 }
186
187 fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
188 self.row_scratch = scratch;
189 }
190
191 pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
192 let first_plan = self.plans.first()?;
193 let mut chunk_idx = None;
194 for (idx, meta) in first_plan.row_metas.iter().enumerate() {
195 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
196 chunk_idx = Some(idx);
197 break;
198 }
199 }
200 if chunk_idx.is_none() {
201 let total_chunks = first_plan.row_metas.len();
202 'outer: for idx in 0..total_chunks {
203 for plan in &self.plans {
204 let meta = &plan.row_metas[idx];
205 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
206 chunk_idx = Some(idx);
207 break 'outer;
208 }
209 }
210 }
211 }
212 let idx = chunk_idx?;
213
214 let mut span_min = u64::MAX;
215 let mut span_max = 0u64;
216 for plan in &self.plans {
217 let meta = plan.row_metas.get(idx)?;
218 span_min = span_min.min(meta.min_val_u64);
219 span_max = span_max.max(meta.max_val_u64);
220 }
221
222 if span_min > span_max {
223 return None;
224 }
225
226 Some((idx, span_min, span_max))
227 }
228}
229
230#[derive(Clone, Debug)]
231struct FieldPlan {
232 dtype: DataType,
233 value_metas: Vec<ChunkMetadata>,
234 row_metas: Vec<ChunkMetadata>,
235 candidate_indices: Vec<usize>,
236}
237
238impl<P> ColumnStore<P>
239where
240 P: Pager<Blob = EntryHandle> + Send + Sync,
241{
242 pub fn gather_rows(
247 &self,
248 field_ids: &[LogicalFieldId],
249 row_ids: &[u64],
250 policy: GatherNullPolicy,
251 ) -> Result<RecordBatch> {
252 self.gather_rows_with_schema(field_ids, row_ids, policy, None)
253 }
254
255 pub fn gather_rows_with_schema(
261 &self,
262 field_ids: &[LogicalFieldId],
263 row_ids: &[u64],
264 policy: GatherNullPolicy,
265 expected_schema: Option<Arc<Schema>>,
266 ) -> Result<RecordBatch> {
267 let mut ctx = self.prepare_gather_context(field_ids)?;
268 self.execute_gather_single_pass_with_schema(&mut ctx, row_ids, policy, expected_schema)
269 }
270
271 fn execute_gather_single_pass_with_schema(
273 &self,
274 ctx: &mut MultiGatherContext,
275 row_ids: &[u64],
276 policy: GatherNullPolicy,
277 expected_schema: Option<Arc<Schema>>,
278 ) -> Result<RecordBatch> {
279 if ctx.is_empty() {
280 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
281 }
282
283 let field_infos = ctx.field_infos().to_vec();
284
285 if row_ids.is_empty() {
286 let (schema, arrays) = if let Some(expected_schema) = expected_schema {
288 let mut arrays = Vec::with_capacity(expected_schema.fields().len());
289 for field in expected_schema.fields() {
290 arrays.push(new_empty_array(field.data_type()));
291 }
292 (expected_schema, arrays)
293 } else {
294 let fields = ctx.fields();
297 let mut arrays = Vec::with_capacity(fields.len());
298 for field in fields {
299 arrays.push(new_empty_array(field.data_type()));
300 }
301 (Arc::new(Schema::new(fields.to_vec())), arrays)
302 };
303 return RecordBatch::try_new(schema, arrays)
304 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
305 }
306
307 let mut row_index: FxHashMap<u64, usize> =
308 FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
309 for (idx, &row_id) in row_ids.iter().enumerate() {
310 if row_index.insert(row_id, idx).is_some() {
311 return Err(Error::Internal(
312 "duplicate row_id in gather_rows_multi".into(),
313 ));
314 }
315 }
316
317 let mut sorted_row_ids = row_ids.to_vec();
318 sorted_row_ids.sort_unstable();
319
320 let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
321 {
322 let plans_mut = ctx.plans_mut();
323 for plan in plans_mut.iter_mut() {
324 plan.candidate_indices.clear();
325 for (idx, meta) in plan.row_metas.iter().enumerate() {
326 if Self::chunk_intersects(&sorted_row_ids, meta) {
327 plan.candidate_indices.push(idx);
328 chunk_keys.insert(plan.value_metas[idx].chunk_pk);
329 chunk_keys.insert(plan.row_metas[idx].chunk_pk);
330 }
331 }
332 }
333 }
334
335 let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
336 for &key in &chunk_keys {
337 chunk_requests.push(BatchGet::Raw { key });
338 }
339
340 let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
341 FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
342 if !chunk_requests.is_empty() {
343 let chunk_results = self.pager.batch_get(&chunk_requests)?;
344 for result in chunk_results {
345 if let GetResult::Raw { key, bytes } = result {
346 chunk_map.insert(key, bytes);
347 }
348 }
349 }
350
351 let allow_missing = policy.allow_missing();
352
353 let mut outputs = Vec::with_capacity(ctx.plans().len());
354 for plan in ctx.plans() {
355 let array = match &plan.dtype {
356 DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
357 &row_index,
358 row_ids.len(),
359 plan,
360 &mut chunk_map,
361 allow_missing,
362 ),
363 DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
364 &row_index,
365 row_ids.len(),
366 plan,
367 &mut chunk_map,
368 allow_missing,
369 ),
370 DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
371 &row_index,
372 row_ids.len(),
373 plan,
374 &mut chunk_map,
375 allow_missing,
376 ),
377 DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
378 &row_index,
379 row_ids.len(),
380 plan,
381 &mut chunk_map,
382 allow_missing,
383 ),
384 DataType::Utf8View => Self::gather_rows_single_shot_string_view(
385 &row_index,
386 row_ids.len(),
387 plan,
388 &mut chunk_map,
389 allow_missing,
390 ),
391 DataType::Boolean => Self::gather_rows_single_shot_bool(
392 &row_index,
393 row_ids.len(),
394 plan,
395 &mut chunk_map,
396 allow_missing,
397 ),
398 DataType::Struct(_) => Self::gather_rows_single_shot_struct(
399 &row_index,
400 row_ids.len(),
401 plan,
402 &mut chunk_map,
403 allow_missing,
404 &plan.dtype,
405 ),
406 DataType::Decimal128(_, _) => Self::gather_rows_single_shot_decimal128(
407 &row_index,
408 row_ids.len(),
409 plan,
410 &mut chunk_map,
411 allow_missing,
412 &plan.dtype,
413 ),
414 other => with_integer_arrow_type!(
415 other.clone(),
416 |ArrowTy| {
417 Self::gather_rows_single_shot::<ArrowTy>(
418 &row_index,
419 row_ids.len(),
420 plan,
421 &mut chunk_map,
422 allow_missing,
423 )
424 },
425 Err(Error::Internal(format!(
426 "gather_rows_multi: unsupported dtype {:?}",
427 other
428 ))),
429 ),
430 }?;
431 outputs.push(array);
432 }
433
434 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
435 Self::filter_rows_with_non_null(outputs)?
436 } else {
437 outputs
438 };
439
440 let mut fields = Vec::with_capacity(field_infos.len());
441 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
442 let array = &outputs[idx];
443 let field_name = format!("field_{}", u64::from(*fid));
444 let nullable = match policy {
445 GatherNullPolicy::IncludeNulls => true,
446 _ => array.null_count() > 0,
447 };
448 fields.push(Field::new(field_name, dtype.clone(), nullable));
449 }
450
451 let schema = Arc::new(Schema::new(fields));
452 RecordBatch::try_new(schema, outputs)
453 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
454 }
455
456 pub fn prepare_gather_context(
457 &self,
458 field_ids: &[LogicalFieldId],
459 ) -> Result<MultiGatherContext> {
460 let mut field_infos = Vec::with_capacity(field_ids.len());
461 for &fid in field_ids {
462 field_infos.push((fid, self.data_type(fid)?));
463 }
464
465 if field_infos.is_empty() {
466 return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
467 }
468
469 let catalog = self.catalog.read().unwrap();
470 let mut key_pairs = Vec::with_capacity(field_infos.len());
471 for (fid, _) in &field_infos {
472 let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
473 let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
474 key_pairs.push((value_pk, row_pk));
475 }
476 drop(catalog);
477
478 let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
479 for (value_pk, row_pk) in &key_pairs {
480 descriptor_requests.push(BatchGet::Raw { key: *value_pk });
481 descriptor_requests.push(BatchGet::Raw { key: *row_pk });
482 }
483 let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
484 let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
485 for result in descriptor_results {
486 if let GetResult::Raw { key, bytes } = result {
487 descriptor_map.insert(key, bytes);
488 }
489 }
490
491 let mut plans = Vec::with_capacity(field_infos.len());
492 for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
493 let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
494 let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
495 let value_metas =
496 Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
497
498 let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
499 let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
500 let row_metas =
501 Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
502
503 if value_metas.len() != row_metas.len() {
504 return Err(Error::Internal(
505 "gather_rows_multi: chunk count mismatch".into(),
506 ));
507 }
508
509 plans.push(FieldPlan {
510 dtype: dtype.clone(),
511 value_metas,
512 row_metas,
513 candidate_indices: Vec::new(),
514 });
515 }
516
517 Ok(MultiGatherContext::new(field_infos, plans))
518 }
519
520 pub fn gather_rows_with_reusable_context(
525 &self,
526 ctx: &mut MultiGatherContext,
527 row_ids: &[u64],
528 policy: GatherNullPolicy,
529 ) -> Result<RecordBatch> {
530 if ctx.is_empty() {
531 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
532 }
533
534 if row_ids.is_empty() {
535 let mut arrays = Vec::with_capacity(ctx.field_infos().len());
536 let mut fields = Vec::with_capacity(ctx.field_infos().len());
537 for (fid, dtype) in ctx.field_infos() {
538 arrays.push(new_empty_array(dtype));
539 let field_name = format!("field_{}", u64::from(*fid));
540 fields.push(Field::new(field_name, dtype.clone(), true));
541 }
542 let schema = Arc::new(Schema::new(fields));
543 return RecordBatch::try_new(schema, arrays)
544 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
545 }
546
547 let mut row_index = ctx.take_row_index();
548 let mut row_scratch = ctx.take_row_scratch();
549
550 let field_infos = ctx.field_infos().to_vec();
551 let mut chunk_keys = ctx.take_chunk_keys();
552
553 let result: Result<RecordBatch> = (|| {
554 let len = row_ids.len();
555 if row_scratch.len() < len {
556 row_scratch.resize(len, None);
557 }
558
559 let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
560 let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
561 Cow::Borrowed(row_ids)
562 } else {
563 let mut buf = row_ids.to_vec();
564 buf.sort_unstable();
565 Cow::Owned(buf)
566 };
567 let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
568
569 let dense_base = if len == 0 {
570 None
571 } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
572 {
573 Some(row_ids[0])
574 } else {
575 None
576 };
577
578 if dense_base.is_none() {
579 row_index.clear();
580 row_index.reserve(len);
581 for (idx, &row_id) in row_ids.iter().enumerate() {
582 if row_index.insert(row_id, idx).is_some() {
583 return Err(Error::Internal(
584 "duplicate row_id in gather_rows_multi".into(),
585 ));
586 }
587 }
588 } else {
589 row_index.clear();
590 }
591
592 let row_locator = if let Some(base) = dense_base {
593 RowLocator::Dense { base }
594 } else {
595 RowLocator::Sparse { index: &row_index }
596 };
597
598 chunk_keys.clear();
599
600 {
601 let plans_mut = ctx.plans_mut();
602 for plan in plans_mut.iter_mut() {
603 plan.candidate_indices.clear();
604 for (idx, meta) in plan.row_metas.iter().enumerate() {
605 if Self::chunk_intersects(sorted_row_ids, meta) {
606 plan.candidate_indices.push(idx);
607 chunk_keys.push(plan.value_metas[idx].chunk_pk);
608 chunk_keys.push(plan.row_metas[idx].chunk_pk);
609 }
610 }
611 }
612 }
613
614 chunk_keys.sort_unstable();
615 chunk_keys.dedup();
616
617 {
618 let mut pending: Vec<BatchGet> = Vec::new();
619 {
620 let cache = ctx.chunk_cache();
621 for &key in &chunk_keys {
622 if !cache.contains_key(&key) {
623 pending.push(BatchGet::Raw { key });
624 }
625 }
626 }
627
628 if !pending.is_empty() {
629 let chunk_results = self.pager.batch_get(&pending)?;
630 let cache = ctx.chunk_cache_mut();
631 for result in chunk_results {
632 if let GetResult::Raw { key, bytes } = result {
633 let array = deserialize_array(bytes)?;
634 cache.insert(key, Arc::clone(&array));
635 }
636 }
637 }
638 }
639
640 let allow_missing = policy.allow_missing();
641
642 let mut outputs = Vec::with_capacity(ctx.plans().len());
643 for plan in ctx.plans() {
644 let array = match &plan.dtype {
645 DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
646 row_ids,
647 row_locator,
648 len,
649 &plan.candidate_indices,
650 plan,
651 ctx.chunk_cache(),
652 &mut row_scratch,
653 allow_missing,
654 ),
655 DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
656 row_ids,
657 row_locator,
658 len,
659 &plan.candidate_indices,
660 plan,
661 ctx.chunk_cache(),
662 &mut row_scratch,
663 allow_missing,
664 ),
665 DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
666 row_ids,
667 row_locator,
668 len,
669 &plan.candidate_indices,
670 plan,
671 ctx.chunk_cache(),
672 &mut row_scratch,
673 allow_missing,
674 ),
675 DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
676 row_ids,
677 row_locator,
678 len,
679 &plan.candidate_indices,
680 plan,
681 ctx.chunk_cache(),
682 &mut row_scratch,
683 allow_missing,
684 ),
685 DataType::Boolean => Self::gather_rows_from_chunks_bool(
686 row_ids,
687 row_locator,
688 len,
689 &plan.candidate_indices,
690 plan,
691 ctx.chunk_cache(),
692 &mut row_scratch,
693 allow_missing,
694 ),
695 DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
696 row_locator,
697 len,
698 &plan.candidate_indices,
699 plan,
700 ctx.chunk_cache(),
701 &mut row_scratch,
702 allow_missing,
703 &plan.dtype,
704 ),
705 DataType::Decimal128(_, _) => Self::gather_rows_from_chunks_decimal128(
706 row_ids,
707 row_locator,
708 len,
709 &plan.candidate_indices,
710 plan,
711 ctx.chunk_cache(),
712 &mut row_scratch,
713 allow_missing,
714 &plan.dtype,
715 ),
716 other => with_integer_arrow_type!(
717 other.clone(),
718 |ArrowTy| {
719 Self::gather_rows_from_chunks::<ArrowTy>(
720 row_ids,
721 row_locator,
722 len,
723 &plan.candidate_indices,
724 plan,
725 ctx.chunk_cache(),
726 &mut row_scratch,
727 allow_missing,
728 )
729 },
730 Err(Error::Internal(format!(
731 "gather_rows_multi: unsupported dtype {:?}",
732 other
733 ))),
734 ),
735 }?;
736 outputs.push(array);
737 }
738
739 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
740 Self::filter_rows_with_non_null(outputs)?
741 } else {
742 outputs
743 };
744
745 let mut fields = Vec::with_capacity(field_infos.len());
746 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
747 let array = &outputs[idx];
748 let field_name = format!("field_{}", u64::from(*fid));
749 let nullable = match policy {
750 GatherNullPolicy::IncludeNulls => true,
751 _ => array.null_count() > 0,
752 };
753 fields.push(Field::new(field_name, dtype.clone(), nullable));
754 }
755
756 let schema = Arc::new(Schema::new(fields));
757 RecordBatch::try_new(schema, outputs)
758 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
759 })();
760
761 ctx.store_row_scratch(row_scratch);
762 ctx.store_row_index(row_index);
763 ctx.store_chunk_keys(chunk_keys);
764
765 result
766 }
767
768 fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
769 let mut metas = Vec::new();
770 if head_page_pk == 0 {
771 return Ok(metas);
772 }
773 for meta in DescriptorIterator::new(pager, head_page_pk) {
774 let meta = meta?;
775 if meta.row_count > 0 {
776 metas.push(meta);
777 }
778 }
779 Ok(metas)
780 }
781
782 #[inline]
783 fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
784 if sorted_row_ids.is_empty() || meta.row_count == 0 {
785 return false;
786 }
787 let min = meta.min_val_u64;
788 let max = meta.max_val_u64;
789 if min == 0 && max == 0 && meta.row_count > 0 {
790 return true;
791 }
792 if min > max {
793 return true;
794 }
795 let min_req = sorted_row_ids[0];
796 let max_req = *sorted_row_ids.last().unwrap();
797 if max < min_req || min > max_req {
798 return false;
799 }
800 let idx = sorted_row_ids.partition_point(|&rid| rid < min);
801 idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
802 }
803
804 fn gather_rows_single_shot_string<O>(
805 row_index: &FxHashMap<u64, usize>,
806 len: usize,
807 plan: &FieldPlan,
808 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
809 allow_missing: bool,
810 ) -> Result<ArrayRef>
811 where
812 O: OffsetSizeTrait,
813 {
814 shared_gather_rows_single_shot_string::<O>(
815 row_index,
816 len,
817 &plan.value_metas,
818 &plan.row_metas,
819 &plan.candidate_indices,
820 chunk_blobs,
821 allow_missing,
822 )
823 }
824
825 #[allow(clippy::too_many_arguments)]
826 fn gather_rows_single_shot_bool(
827 row_index: &FxHashMap<u64, usize>,
828 len: usize,
829 plan: &FieldPlan,
830 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
831 allow_missing: bool,
832 ) -> Result<ArrayRef> {
833 shared_gather_rows_single_shot_bool(
834 row_index,
835 len,
836 &plan.value_metas,
837 &plan.row_metas,
838 &plan.candidate_indices,
839 chunk_blobs,
840 allow_missing,
841 )
842 }
843
844 fn gather_rows_single_shot_struct(
845 row_index: &FxHashMap<u64, usize>,
846 len: usize,
847 plan: &FieldPlan,
848 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
849 allow_missing: bool,
850 dtype: &DataType,
851 ) -> Result<ArrayRef> {
852 shared_gather_rows_single_shot_struct(
853 row_index,
854 len,
855 &plan.value_metas,
856 &plan.row_metas,
857 &plan.candidate_indices,
858 chunk_blobs,
859 allow_missing,
860 dtype,
861 )
862 }
863
864 fn gather_rows_single_shot_decimal128(
865 row_index: &FxHashMap<u64, usize>,
866 len: usize,
867 plan: &FieldPlan,
868 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
869 allow_missing: bool,
870 dtype: &DataType,
871 ) -> Result<ArrayRef> {
872 shared_gather_rows_single_shot_decimal128(
873 row_index,
874 len,
875 &plan.value_metas,
876 &plan.row_metas,
877 &plan.candidate_indices,
878 chunk_blobs,
879 allow_missing,
880 dtype,
881 )
882 }
883
884 #[allow(clippy::too_many_arguments)]
885 fn gather_rows_single_shot<T>(
886 row_index: &FxHashMap<u64, usize>,
887 len: usize,
888 plan: &FieldPlan,
889 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
890 allow_missing: bool,
891 ) -> Result<ArrayRef>
892 where
893 T: ArrowPrimitiveType,
894 {
895 shared_gather_rows_single_shot::<T>(
896 row_index,
897 len,
898 &plan.value_metas,
899 &plan.row_metas,
900 &plan.candidate_indices,
901 chunk_blobs,
902 allow_missing,
903 )
904 }
905
906 fn gather_rows_single_shot_string_view(
907 row_index: &FxHashMap<u64, usize>,
908 len: usize,
909 plan: &FieldPlan,
910 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
911 allow_missing: bool,
912 ) -> Result<ArrayRef> {
913 shared_gather_rows_single_shot_string_view(
914 row_index,
915 len,
916 &plan.value_metas,
917 &plan.row_metas,
918 &plan.candidate_indices,
919 chunk_blobs,
920 allow_missing,
921 )
922 }
923
924 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_string<O>(
926 row_ids: &[u64],
927 row_locator: RowLocator,
928 len: usize,
929 candidate_indices: &[usize],
930 plan: &FieldPlan,
931 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
932 row_scratch: &mut [Option<(usize, usize)>],
933 allow_missing: bool,
934 ) -> Result<ArrayRef>
935 where
936 O: OffsetSizeTrait,
937 {
938 shared_gather_rows_from_chunks_string::<O>(
939 row_ids,
940 row_locator,
941 len,
942 candidate_indices,
943 &plan.value_metas,
944 &plan.row_metas,
945 chunk_arrays,
946 row_scratch,
947 allow_missing,
948 )
949 }
950
951 fn gather_rows_single_shot_binary<O>(
952 row_index: &FxHashMap<u64, usize>,
953 len: usize,
954 plan: &FieldPlan,
955 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
956 allow_missing: bool,
957 ) -> Result<ArrayRef>
958 where
959 O: OffsetSizeTrait,
960 {
961 shared_gather_rows_single_shot_binary::<O>(
962 row_index,
963 len,
964 &plan.value_metas,
965 &plan.row_metas,
966 &plan.candidate_indices,
967 chunk_blobs,
968 allow_missing,
969 )
970 }
971
972 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_binary<O>(
974 row_ids: &[u64],
975 row_locator: RowLocator,
976 len: usize,
977 candidate_indices: &[usize],
978 plan: &FieldPlan,
979 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
980 row_scratch: &mut [Option<(usize, usize)>],
981 allow_missing: bool,
982 ) -> Result<ArrayRef>
983 where
984 O: OffsetSizeTrait,
985 {
986 shared_gather_rows_from_chunks_binary::<O>(
987 row_ids,
988 row_locator,
989 len,
990 candidate_indices,
991 &plan.value_metas,
992 &plan.row_metas,
993 chunk_arrays,
994 row_scratch,
995 allow_missing,
996 )
997 }
998
999 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_bool(
1001 row_ids: &[u64],
1002 row_locator: RowLocator,
1003 len: usize,
1004 candidate_indices: &[usize],
1005 plan: &FieldPlan,
1006 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1007 row_scratch: &mut [Option<(usize, usize)>],
1008 allow_missing: bool,
1009 ) -> Result<ArrayRef> {
1010 shared_gather_rows_from_chunks_bool(
1011 row_ids,
1012 row_locator,
1013 len,
1014 candidate_indices,
1015 &plan.value_metas,
1016 &plan.row_metas,
1017 chunk_arrays,
1018 row_scratch,
1019 allow_missing,
1020 )
1021 }
1022
1023 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_struct(
1025 row_locator: RowLocator,
1026 len: usize,
1027 candidate_indices: &[usize],
1028 plan: &FieldPlan,
1029 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1030 row_scratch: &mut [Option<(usize, usize)>],
1031 allow_missing: bool,
1032 dtype: &DataType,
1033 ) -> Result<ArrayRef> {
1034 shared_gather_rows_from_chunks_struct(
1035 row_locator,
1036 len,
1037 candidate_indices,
1038 &plan.value_metas,
1039 &plan.row_metas,
1040 chunk_arrays,
1041 row_scratch,
1042 allow_missing,
1043 dtype,
1044 )
1045 }
1046
1047 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_decimal128(
1049 row_ids: &[u64],
1050 row_locator: RowLocator,
1051 len: usize,
1052 candidate_indices: &[usize],
1053 plan: &FieldPlan,
1054 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1055 row_scratch: &mut [Option<(usize, usize)>],
1056 allow_missing: bool,
1057 dtype: &DataType,
1058 ) -> Result<ArrayRef> {
1059 shared_gather_rows_from_chunks_decimal128(
1060 row_ids,
1061 row_locator,
1062 len,
1063 candidate_indices,
1064 &plan.value_metas,
1065 &plan.row_metas,
1066 chunk_arrays,
1067 row_scratch,
1068 allow_missing,
1069 dtype,
1070 )
1071 }
1072
1073 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks<T>(
1075 row_ids: &[u64],
1076 row_locator: RowLocator,
1077 len: usize,
1078 candidate_indices: &[usize],
1079 plan: &FieldPlan,
1080 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1081 row_scratch: &mut [Option<(usize, usize)>],
1082 allow_missing: bool,
1083 ) -> Result<ArrayRef>
1084 where
1085 T: ArrowPrimitiveType,
1086 {
1087 shared_gather_rows_from_chunks::<T>(
1088 row_ids,
1089 row_locator,
1090 len,
1091 candidate_indices,
1092 &plan.value_metas,
1093 &plan.row_metas,
1094 chunk_arrays,
1095 row_scratch,
1096 allow_missing,
1097 )
1098 }
1099
1100 fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1101 shared_filter_rows_with_non_null(columns)
1102 }
1103}